diff --git a/kv_cache_benchmark/README.md b/kv_cache_benchmark/README.md index b6757782..cfe986b0 100644 --- a/kv_cache_benchmark/README.md +++ b/kv_cache_benchmark/README.md @@ -25,7 +25,8 @@ A storage benchmarking tool for Large Language Model inference systems. This ben 12. [Understanding Results](#understanding-results) 13. [Unit Testing](#unit-testing) 14. [Excel Export](#excel-export) -15. [MLPerf Submission Guidelines](#mlperf-submission-guidelines) +15. [Block-Layer Latency Tracing](#block-layer-latency-tracing) +16. [MLPerf Submission Guidelines](#mlperf-submission-guidelines) 16. [Troubleshooting](#troubleshooting) --- @@ -1498,10 +1499,39 @@ The test suite covers 23 component categories with ~170+ individual tests: | `TestPerTierPhaseMetrics` | 7 | Per-tier (GPU/CPU/Storage) KV bytes read/written tracking during prefill/decode phases | | `TestPerTierPhaseMetricsWithGPU` | 4 | GPU tier metrics tracking, phase-aware read/write separation (skipped without GPU) | +### Visualize User Request Flow + +The `TestVisualizeUserRequestFlow` test class traces the complete I/O path of real requests through the benchmark; these are the tests to run when you want to understand exactly what the benchmark does at each step: + +```bash +# Part 3: The 4 latency levels (L1-L4) with real NVMe timing +pytest tests/test_kv_cache.py::TestVisualizeUserRequestFlow::test_part3_four_latency_levels -v -s + +# Part 3b: How requests become .npy files on disk +pytest tests/test_kv_cache.py::TestVisualizeUserRequestFlow::test_part3b_request_to_npy_file_mapping -v -s + +# Part 3c: Multi-turn conversation I/O (triangular read pattern) +pytest tests/test_kv_cache.py::TestVisualizeUserRequestFlow::test_part3c_multi_turn_prefill_decode_file_io -v -s + +# Part 3d: Multi-turn with eviction pressure (hits vs misses under LRU) +pytest tests/test_kv_cache.py::TestVisualizeUserRequestFlow::test_part3d_multi_turn_with_eviction -v -s + +# Part 4: 3-tier waterfall LRU eviction cascade (GPU -> CPU -> NVMe -> DELETE) +pytest tests/test_kv_cache.py::TestVisualizeUserRequestFlow::test_part4_three_tier_waterfall_eviction -v -s + +# Part 5: NVMe-only eviction (what happens when the drive fills up) +pytest tests/test_kv_cache.py::TestVisualizeUserRequestFlow::test_part5_one_tier_nvme_only_eviction -v -s + +# Run all visualization tests at once +pytest tests/test_kv_cache.py::TestVisualizeUserRequestFlow -v -s +``` + +Use `-s` to see the printed I/O traces; without it pytest captures the output and you lose the visualization. + ### Expected Runtime -- **Without GPU**: ~5-10 seconds -- **With GPU**: ~10-15 seconds +- **Without GPU**: ~4-5 minutes (211 tests) +- **With GPU**: ~5-6 minutes GPU tests are automatically skipped if CUDA is not available. @@ -1553,6 +1583,106 @@ The Excel file contains a single row with all key metrics: --- +## Block-Layer Latency Tracing + +The `--enable-latency-tracing` flag adds block-layer visibility to the benchmark with a single flag; no code changes, no separate tooling, minimal overhead. It spawns bpftrace as a sudo subprocess, attaches to the kernel block layer tracepoints during the benchmark run, and on completion distills the I/O profile into structured telemetry across stdout, JSON, and XLSX. + +This is the same class of telemetry that storage engineers use when characterizing production workloads; the difference is that it is fully integrated into the benchmark and the results are machine-readable. + +### What It Captures + +15 histograms across the full I/O stack: + +| Category | Histograms | What It Tells You | +|----------|-----------|-------------------| +| Device hardware | D2C read/write | Per-NVMe-command completion time; this is what the SSD controller actually took | +| I/O scheduler | Q2D read/write | Time sitting in the Linux I/O scheduler queue before dispatch | +| Application visible | VFS read/write | Full syscall latency from the application's perspective | +| Serialization | write-to-fsync gap, fsync, fadvise-to-read gap | CPU vs device bottleneck decomposition | +| Block sizes | bssplit read/write | I/O size distribution at the kernel layer (matches MDTS splits) | +| Queue depth | In-flight at dispatch read/write | Instantaneous I/O concurrency at the moment of dispatch | +| Spatial | LBA heatmap read/write | Where on the device the I/O lands (10 GB linear buckets) | + +### Usage + +```bash +# Run benchmark with tracing (requires sudo for bpftrace) +kv-cache --config config.yaml --model llama3.1-8b \ + --num-users 10 --duration 30 \ + --gpu-mem-gb 0 --cpu-mem-gb 0 \ + --max-concurrent-allocs 1 \ + --generation-mode none \ + --cache-dir /mnt/nvme --seed 42 \ + --enable-latency-tracing \ + --xlsx-output results_traced.xlsx +``` + +The tracing output appears at the end of the benchmark results. The XLSX gets two additional sheets: **Device Tracing** (P50/P95/P99 summary per histogram) and **Trace Histograms** (raw bucket data for charting). + +### Standalone Tracing Against vLLM / llm-d + +The bpftrace scripts work independently of the benchmark. Point them at any inference engine process: + +```bash +# Trace vLLM and generate a fio workload +sudo ./utils/storage_latency_stack.sh vllm --fio + +# Trace llm-d +sudo ./utils/storage_latency_stack.sh llm-d --fio + +# Trace any process +sudo ./utils/storage_latency_stack.sh python3 + +# Manual distill from saved trace +python3 utils/distill_fio.py -i trace_output.txt --process vllm -o vllm_workload.ini +``` + +The `--fio` flag captures the bpftrace output and pipes it through `distill_fio.py` to generate a standalone fio workload file. This means you can trace vLLM on a production node, take the generated .ini file, and replay the exact I/O pattern on a bare-metal test rig with fio to compare drives without running the inference stack. + +### fio Workload Distiller + +When `--enable-latency-tracing` is used with the benchmark, or when `--fio` is passed to the shell wrapper, a fio .ini file is generated automatically. The fio config includes: + +- **bssplit** from the traced block size distribution (separate read/write splits) +- **rwmixread** from the read/write I/O count ratio +- **iodepth** from the in-flight I/O histogram P50 +- **thinktime** from the write-to-fsync serialization gap (idle time between I/O bursts) +- D2C latency summary and LBA hot zone in the header comments + +Example generated config: +```ini +[kv-cache-traced] +ioengine=libaio +direct=1 +time_based +runtime=300 +rw=randrw +rwmixread=87 +bssplit=4k/1:8k/1:16k/1:32k/1:64k/1:128k/100,4k/7:8k/1:16k/1:32k/4:64k/4:128k/83 +iodepth=2048 +iodepth_batch_submit=2048 +iodepth_batch_complete_min=1 +size=100G +thinktime=32 +thinktime_blocks=2048 +thinktime_iotime=1s +refill_buffers=1 +norandommap=1 +randrepeat=0 +numjobs=1 +group_reporting +percentile_list=50:95:99:99.9:99.99 +``` + +### Requirements + +- Linux 5.x+ with BTF support +- bpftrace 0.14+ (`sudo apt install bpftrace`) +- sudo or CAP_BPF privileges +- If bpftrace is not available, the flag degrades gracefully; the benchmark runs normally without tracing. + +--- + ## MLPerf Submission Guidelines For official MLPerf v3.0 storage submissions, use these standardized commands. **These invocations have been validated through extensive discovery testing** (1,411 Fast system tests, 268 Slow system tests comparing 14,000 MB/s vs 3,000 MB/s storage). diff --git a/kv_cache_benchmark/docs/MLperf v3 KV cache proposal.md b/kv_cache_benchmark/docs/MLperf v3 KV cache proposal.md index 0e632e59..d5ae8fc7 100644 --- a/kv_cache_benchmark/docs/MLperf v3 KV cache proposal.md +++ b/kv_cache_benchmark/docs/MLperf v3 KV cache proposal.md @@ -1981,6 +1981,92 @@ data = buffer[start:start+size].reshape(kv_shape) --- +## 8.5 Block-Layer Latency Tracing & fio Workload Distiller + +The benchmark includes an integrated block-layer tracing capability that decomposes storage I/O across every layer of the Linux I/O stack; from the application (VFS) down to the NVMe controller (D2C). This is enabled with a single flag and requires no code changes, no separate tooling, and adds minimal overhead to the benchmark run. + +### Motivation + +The L4 "device" latency reported by the benchmark measures the time to read or write an entire .npy file through NumPy. For large KV cache entries (500 MB to 2 GB), the kernel splits each file I/O into hundreds of NVMe commands at the MDTS boundary. The resulting P95 device latency reflects the total time to load a large entry; it includes both the actual NVMe hardware time and the numpy deserialization overhead within that single np.load() call. Without block-layer visibility, there is no way to distinguish how much of that latency is the drive vs the host. + +### Enabling Tracing + +```bash +kv-cache --config config.yaml --model llama3.1-8b \ + --num-users 10 --duration 30 \ + --gpu-mem-gb 0 --cpu-mem-gb 0 \ + --max-concurrent-allocs 1 \ + --generation-mode none \ + --cache-dir /mnt/nvme --seed 42 \ + --enable-latency-tracing \ + --xlsx-output results_traced.xlsx +``` + +The benchmark spawns bpftrace as a sudo subprocess before the run, attaches to 16 kernel tracepoints, and on completion sends SIGINT to collect the histogram data. The tracing subprocess runs in its own process group; the benchmark itself does not require root. + +### Histograms Captured + +| Histogram | Layer | What It Measures | +|-----------|-------|-----------------| +| D2C read/write | Device | Per-NVMe-command completion time (actual hardware latency) | +| Q2D read/write | I/O Scheduler | Time in the scheduler queue before dispatch to the NVMe driver | +| VFS read/write | Application | Full syscall time including page cache, filesystem, and block I/O | +| fsync | Device | Actual device flush latency after buffered writes | +| write-to-fsync gap | Serialization | CPU idle time between write() return and fsync() entry | +| fadvise-to-read gap | Cache mgmt | Overhead of page cache invalidation before reads | +| bssplit read/write | Block sizes | I/O size distribution at the kernel layer | +| Queue depth read/write | Concurrency | Instantaneous in-flight I/O count at the moment of dispatch | +| LBA heatmap read/write | Spatial | Where on the device the I/O lands (10 GB linear buckets) | + +### fio Workload Distiller + +When tracing is enabled, the benchmark automatically generates a standalone fio .ini file that reproduces the observed I/O pattern. The distiller extracts bssplit (block size distribution with separate read/write splits), rwmixread (from the I/O count ratio), iodepth (from the in-flight I/O histogram), and thinktime (from the write-to-fsync serialization gap) and writes them into a fio config that can be run independently against any device. + +Example output from a traced benchmark run on Kingston DC3000ME: + +```ini +[kv-cache-traced] +ioengine=libaio +direct=1 +time_based +runtime=300 +rw=randrw +rwmixread=87 +bssplit=4k/1:8k/1:16k/1:32k/1:64k/1:128k/100,4k/7:8k/1:16k/1:32k/4:64k/4:128k/83 +iodepth=2048 +iodepth_batch_submit=2048 +iodepth_batch_complete_min=1 +size=100G +thinktime=32 +thinktime_blocks=2048 +thinktime_iotime=1s +refill_buffers=1 +norandommap=1 +randrepeat=0 +numjobs=1 +group_reporting +percentile_list=50:95:99:99.9:99.99 +``` + +### Standalone Usage Against Inference Engines + +The tracing tools work independently of the benchmark. The shell wrapper and Python distiller can be pointed at any process: + +```bash +# Trace vLLM and generate fio workload +sudo ./utils/storage_latency_stack.sh vllm --fio + +# Trace llm-d +sudo ./utils/storage_latency_stack.sh llm-d --fio + +# Manual distill from saved trace output +python3 utils/distill_fio.py -i trace_output.txt --process vllm -o vllm_workload.ini +``` + +This means you can characterize the I/O profile of a real inference engine on a production node, take the generated fio .ini file to a test bench, and run it against multiple drives with fio to compare storage performance without deploying the full inference stack. + +--- + ## 9. Common Issues & Troubleshooting ### Issue: High Host Latency diff --git a/kv_cache_benchmark/kv_cache/benchmark.py b/kv_cache_benchmark/kv_cache/benchmark.py index 8219019b..a69790e0 100755 --- a/kv_cache_benchmark/kv_cache/benchmark.py +++ b/kv_cache_benchmark/kv_cache/benchmark.py @@ -12,9 +12,12 @@ import glob import time import queue +import signal import random import logging import threading +import subprocess +import re from typing import Dict, List, Optional, Tuple from datetime import datetime from concurrent.futures import ThreadPoolExecutor @@ -73,7 +76,8 @@ def __init__(self, trace_speedup: float = 1.0, replay_cycles: int = 0, prefill_only: bool = False, - decode_only: bool = False): + decode_only: bool = False, + enable_latency_tracing: bool = False): self.model_config = model_config self.num_users = num_users @@ -81,6 +85,8 @@ def __init__(self, self.duration = duration_seconds self.enable_autoscaling = enable_autoscaling self.enable_multi_turn = enable_multi_turn + self.enable_latency_tracing = enable_latency_tracing + self._trace_proc = None self.generation_mode = generation_mode self.ms_per_token = GENERATION_TIMING[generation_mode] * 1000 self.enable_prefix_caching = enable_prefix_caching @@ -419,6 +425,15 @@ def user_worker(user: UserProfile): self.user_conversations.pop(user.user_id, None) local_conv_id = None + # Enforce max_turns_per_conv hard cap + if local_conv_id: + with self.conversation_manager.lock: + state = self.conversation_manager.conversations.get(local_conv_id) + if state and state.turn_number >= self.conversation_manager.max_turns_per_conv: + with self.user_conversations_lock: + self.user_conversations.pop(user.user_id, None) + local_conv_id = None + if local_conv_id is None: local_conv_id = self.conversation_manager.start_conversation(user.user_id) with self.user_conversations_lock: @@ -514,17 +529,24 @@ def process_requests(self, stop_event: threading.Event): storage_latency += read_lat request.context_tokens = remaining_tokens - # Skip if decode_only mode (disaggregated decode node) + # Skip steps 2+3 entirely in decode_only mode: + # - Step 2 reads always miss (step 3 writes are skipped, so no entries exist) + # - Step 3 prefill writes don't apply to decode-only nodes if not self.decode_only: - # 2. For multi-turn conversations, access cache from previous turn. + # 2. For multi-turn conversations, reload all previous turns' KV cache. + # Reads every previous turn via access_cache (real I/O for entries + # that survived eviction; immediate (None, 0.0) for evicted entries). if self.conversation_manager and request.turn_number > 1: - prev_turn_key = f"{request.conversation_id}_turn_{request.turn_number - 1}" - location, read_latency = self.cache.access_cache(prev_turn_key, InferencePhase.DECODE, 'multi_turn') - if location is not None: - storage_latency += read_latency - with self.results_lock: self.results['multi_turn_cache_hits'] += 1 - else: - with self.results_lock: self.results['multi_turn_cache_misses'] += 1 + prev_keys = self.conversation_manager.get_all_previous_turn_keys( + request.conversation_id, request.turn_number + ) + for prev_turn_key in prev_keys: + location, read_latency = self.cache.access_cache(prev_turn_key, InferencePhase.DECODE, 'multi_turn') + if location is not None: + storage_latency += read_latency + with self.results_lock: self.results['multi_turn_cache_hits'] += 1 + else: + with self.results_lock: self.results['multi_turn_cache_misses'] += 1 # 3. Perform the main PREFILL operation (a cache WRITE). if request.phase == InferencePhase.PREFILL or request.phase == InferencePhase.PREFILL_DECODE: @@ -556,6 +578,8 @@ def process_requests(self, stop_event: threading.Event): decode_key = request.cache_key location, read_latency = self.cache.access_cache(decode_key, InferencePhase.DECODE, cache_type) + storage_latency += read_latency + decode_total_latency = read_latency if location is None: # Cache miss during decode - need to allocate (unless decode_only) @@ -572,8 +596,9 @@ def process_requests(self, stop_event: threading.Event): for _ in range(num_batched_reads): _, batch_read_latency = self.cache.access_cache(decode_key, InferencePhase.DECODE, cache_type) storage_latency += batch_read_latency + decode_total_latency += batch_read_latency - with self.results_lock: self.results['decode_latencies'].append(read_latency) + with self.results_lock: self.results['decode_latencies'].append(decode_total_latency) # 6. Simulate token generation time. generation_latency = request.generate_tokens * GENERATION_TIMING[self.generation_mode] @@ -668,6 +693,432 @@ def monitor_stats(self, stop_event: threading.Event): f"Throughput: {throughput:.2f} tok/s") last_log_time = now + def _start_latency_tracing(self): + """Spawn bpftrace as a sudo subprocess to trace block-layer device latency.""" + script_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'utils') + script_path = os.path.join(script_dir, 'storage_latency_stack.sh') + + if not os.path.exists(script_path): + logger.warning(f"Tracing script not found: {script_path}") + print(f" WARNING: {script_path} not found. Skipping latency tracing.") + return + + # Determine the process name to filter on + comm = os.path.basename(sys.argv[0]) if sys.argv[0] else 'python3' + + print(f"\n### LATENCY TRACING ###") + print(f" Script: {script_path}") + print(f" Filter: {comm}") + print(f" Spawning sudo bpftrace (you may be prompted for password)...") + + try: + self._trace_proc = subprocess.Popen( + ['sudo', script_path, comm], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + preexec_fn=os.setsid # own process group for clean SIGINT + ) + # Give bpftrace time to attach probes + time.sleep(2) + if self._trace_proc.poll() is not None: + stderr = self._trace_proc.stderr.read().decode('utf-8', errors='replace') + logger.warning(f"bpftrace exited early: {stderr[:200]}") + print(f" WARNING: bpftrace failed to start. Continuing without tracing.") + self._trace_proc = None + else: + print(f" bpftrace attached (pid {self._trace_proc.pid}). Tracing active.") + except FileNotFoundError: + logger.warning("sudo or bpftrace not found") + print(f" WARNING: sudo/bpftrace not available. Skipping latency tracing.") + self._trace_proc = None + except Exception as e: + logger.warning(f"Failed to start tracing: {e}") + print(f" WARNING: Tracing failed: {e}. Continuing without tracing.") + self._trace_proc = None + + def _stop_latency_tracing(self) -> Optional[Dict]: + """Send SIGINT to bpftrace, capture histograms, parse into dict.""" + if not self._trace_proc: + return None + + print(f"\n### COLLECTING TRACE DATA ###") + try: + # SIGINT to the process group triggers bpftrace's END block + os.killpg(os.getpgid(self._trace_proc.pid), signal.SIGINT) + stdout, stderr = self._trace_proc.communicate(timeout=10) + except subprocess.TimeoutExpired: + os.killpg(os.getpgid(self._trace_proc.pid), signal.SIGKILL) + stdout, stderr = self._trace_proc.communicate() + except Exception as e: + logger.warning(f"Error stopping bpftrace: {e}") + return None + + # bpftrace may print histograms to stdout or stderr depending on version + stdout_text = stdout.decode('utf-8', errors='replace') + stderr_text = stderr.decode('utf-8', errors='replace') + output = stdout_text + '\n' + stderr_text + + if '@' not in output: + logger.warning(f"No histogram data in bpftrace output ({len(output)} bytes)") + logger.warning(f"Raw output: {repr(output[:500])}") + return {} + + return self._parse_bpftrace_output(output) + + def _parse_bpftrace_output(self, output: str) -> Dict: + """Parse bpftrace histogram output into structured dict.""" + result = {} + current_hist = None + lines = output.split('\n') + + for line in lines: + # Match histogram name like "@d2c_read_us:" or "@vfs_write_us:" + # bpftrace may prefix with newlines or spaces + hist_match = re.match(r'^\s*@(\w+):\s*$', line) + if hist_match: + current_hist = hist_match.group(1) + result[current_hist] = {'buckets': [], 'raw': []} + continue + + # Match histogram bucket like "[128, 256) 5 |@@@@ |" + bucket_match = re.match(r'^\[(\d+),\s*(\d+)\)\s+(\d+)\s+\|', line) + if bucket_match and current_hist: + low = int(bucket_match.group(1)) + high = int(bucket_match.group(2)) + count = int(bucket_match.group(3)) + result[current_hist]['buckets'].append({ + 'range_us': [low, high], + 'count': count + }) + result[current_hist]['raw'].append(line.rstrip()) + continue + + # Match single-value bucket like "[1M, 2M) 1 |@@@@ |" + bucket_match_k = re.match(r'^\[(\d+)([KM]),\s*(\d+)([KM])\)\s+(\d+)\s+\|', line) + if bucket_match_k and current_hist: + def parse_val(num, suffix): + v = int(num) + return v * 1024 if suffix == 'K' else v * 1048576 + low = parse_val(bucket_match_k.group(1), bucket_match_k.group(2)) + high = parse_val(bucket_match_k.group(3), bucket_match_k.group(4)) + count = int(bucket_match_k.group(5)) + result[current_hist]['buckets'].append({ + 'range_us': [low, high], + 'count': count + }) + result[current_hist]['raw'].append(line.rstrip()) + + return result + + @staticmethod + def _hist_percentile(buckets: List[Dict], pct: float) -> Dict: + """Compute a percentile bucket from a parsed histogram.""" + total = sum(b['count'] for b in buckets) + if total == 0: + return buckets[0] if buckets else {'range_us': [0, 0], 'count': 0} + target = total * pct / 100.0 + cumulative = 0 + for b in buckets: + cumulative += b['count'] + if cumulative >= target: + return b + return buckets[-1] + + def _print_trace_results(self, trace_data: Dict): + """Print parsed bpftrace histograms.""" + if not trace_data: + return + + print(f"\n### DEVICE LATENCY TRACING (bpftrace) ###") + + # ── Latency histograms ── + latency_histograms = [ + ('d2c_read_us', 'D2C Read (device hardware time)', 'us'), + ('d2c_write_us', 'D2C Write (device hardware time)', 'us'), + ('q2d_read_us', 'Q2D Read (I/O scheduler queue)', 'us'), + ('q2d_write_us', 'Q2D Write (I/O scheduler queue)', 'us'), + ('vfs_read_us', 'VFS Read (application-visible)', 'us'), + ('vfs_write_us', 'VFS Write (application-visible)', 'us'), + ('fsync_us', 'fsync (device flush)', 'us'), + ('write_to_fsync_us', 'Write-to-fsync gap (CPU serialization)', 'us'), + ('fadvise_to_read_us', 'fadvise-to-read gap (cache drop)', 'us'), + ] + + for key, label, unit in latency_histograms: + if key not in trace_data or not trace_data[key]['buckets']: + continue + buckets = trace_data[key]['buckets'] + total_count = sum(b['count'] for b in buckets) + if total_count == 0: + continue + + p50 = self._hist_percentile(buckets, 50) + p95 = self._hist_percentile(buckets, 95) + p99 = self._hist_percentile(buckets, 99) + + print(f"\n {label}:") + print(f" Samples: {total_count}") + print(f" P50: [{p50['range_us'][0]:,}, {p50['range_us'][1]:,}) {unit}") + print(f" P95: [{p95['range_us'][0]:,}, {p95['range_us'][1]:,}) {unit}") + print(f" P99: [{p99['range_us'][0]:,}, {p99['range_us'][1]:,}) {unit}") + for raw_line in trace_data[key]['raw']: + print(f" {raw_line}") + + # ── Block size distribution ── + bssplit_histograms = [ + ('bssplit_read_kb', 'Block Size Distribution (Reads)'), + ('bssplit_write_kb', 'Block Size Distribution (Writes)'), + ] + + for key, label in bssplit_histograms: + if key not in trace_data or not trace_data[key]['buckets']: + continue + buckets = trace_data[key]['buckets'] + total_count = sum(b['count'] for b in buckets) + if total_count == 0: + continue + + p50 = self._hist_percentile(buckets, 50) + + print(f"\n {label}:") + print(f" I/O count: {total_count}") + print(f" P50: [{p50['range_us'][0]:,}, {p50['range_us'][1]:,}) KB") + for raw_line in trace_data[key]['raw']: + print(f" {raw_line}") + + # ── Queue depth distribution ── + qd_histograms = [ + ('qd_read', 'Queue Depth at Dispatch (Reads)'), + ('qd_write', 'Queue Depth at Dispatch (Writes)'), + ] + + for key, label in qd_histograms: + if key not in trace_data or not trace_data[key]['buckets']: + continue + buckets = trace_data[key]['buckets'] + total_count = sum(b['count'] for b in buckets) + if total_count == 0: + continue + + p50 = self._hist_percentile(buckets, 50) + p95 = self._hist_percentile(buckets, 95) + + print(f"\n {label}:") + print(f" Samples: {total_count}") + print(f" P50: [{p50['range_us'][0]}, {p50['range_us'][1]})") + print(f" P95: [{p95['range_us'][0]}, {p95['range_us'][1]})") + for raw_line in trace_data[key]['raw']: + print(f" {raw_line}") + + # ── LBA heatmap ── + lba_histograms = [ + ('lba_read_gb', 'LBA Heatmap (Reads, 10 GB buckets)'), + ('lba_write_gb', 'LBA Heatmap (Writes, 10 GB buckets)'), + ] + + for key, label in lba_histograms: + if key not in trace_data or not trace_data[key]['buckets']: + continue + buckets = trace_data[key]['buckets'] + total_count = sum(b['count'] for b in buckets) + if total_count == 0: + continue + + # Find the hot zone (buckets with > 1% of I/O) + hot_zones = [b for b in buckets if b['count'] > total_count * 0.01] + if hot_zones: + hot_start = hot_zones[0]['range_us'][0] + hot_end = hot_zones[-1]['range_us'][1] + hot_pct = sum(b['count'] for b in hot_zones) * 100.0 / total_count + else: + hot_start = hot_end = hot_pct = 0 + + print(f"\n {label}:") + print(f" I/O count: {total_count}") + if hot_zones: + print(f" Hot zone: {hot_start}-{hot_end} GB ({hot_pct:.0f}% of I/O)") + for raw_line in trace_data[key]['raw']: + print(f" {raw_line}") + + def _generate_fio_workload(self, trace_data: Dict) -> Optional[str]: + """Generate a fio workload .ini file from bpftrace trace data. + + Distills the traced block-layer I/O pattern into a standalone fio config + that reproduces the same bssplit, read/write ratio, queue depth, and + idle time characteristics observed during the benchmark run. + """ + # ── Validate minimum required histograms ── + required = ['bssplit_read_kb', 'bssplit_write_kb'] + for key in required: + if key not in trace_data or not trace_data[key].get('buckets'): + logger.warning(f"Missing {key} histogram; cannot generate fio workload") + return None + + # ── bssplit: convert histogram buckets to fio format ── + def hist_to_bssplit(buckets: List[Dict]) -> str: + total = sum(b['count'] for b in buckets) + if total == 0: + return "4k/100" + parts = [] + for b in buckets: + if b['count'] == 0: + continue + size_kb = b['range_us'][0] # lower bound of bucket + pct = int(round(b['count'] * 100.0 / total)) + if pct == 0 and b['count'] > 0: + pct = 1 # don't drop non-zero buckets + # Format size: use k for < 1024, m for >= 1024 + if size_kb >= 1024: + size_str = f"{size_kb // 1024}m" + else: + size_str = f"{size_kb}k" + parts.append(f"{size_str}/{pct}") + return ":".join(parts) if parts else "4k/100" + + read_bssplit = hist_to_bssplit(trace_data['bssplit_read_kb']['buckets']) + write_bssplit = hist_to_bssplit(trace_data['bssplit_write_kb']['buckets']) + bssplit_line = f"{read_bssplit},{write_bssplit}" + + # ── rwmixread: from I/O count ratio ── + read_count = sum(b['count'] for b in trace_data['bssplit_read_kb']['buckets']) + write_count = sum(b['count'] for b in trace_data['bssplit_write_kb']['buckets']) + total_io = read_count + write_count + rwmixread = int(round(read_count * 100.0 / total_io)) if total_io > 0 else 50 + + # ── iodepth: from QD histogram P50 ── + iodepth = 32 # default + for qd_key in ('qd_read', 'qd_write'): + if qd_key in trace_data and trace_data[qd_key].get('buckets'): + p50 = self._hist_percentile(trace_data[qd_key]['buckets'], 50) + candidate = max(1, p50['range_us'][0]) + iodepth = max(iodepth, candidate) + + # ── thinktime: from write_to_fsync gap (CPU idle between I/O bursts) ── + thinktime_us = 0 + if 'write_to_fsync_us' in trace_data and trace_data['write_to_fsync_us'].get('buckets'): + buckets = trace_data['write_to_fsync_us']['buckets'] + total_samples = sum(b['count'] for b in buckets) + if total_samples >= 4: + p50 = self._hist_percentile(buckets, 50) + thinktime_us = p50['range_us'][0] + + # ── thinktime_iotime: from fsync latency (active I/O burst duration) ── + thinktime_iotime_us = 0 + if 'fsync_us' in trace_data and trace_data['fsync_us'].get('buckets'): + buckets = trace_data['fsync_us']['buckets'] + total_samples = sum(b['count'] for b in buckets) + if total_samples >= 4: + p50 = self._hist_percentile(buckets, 50) + thinktime_iotime_us = p50['range_us'][0] + + # ── Build the fio config ── + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + model_name = self.model_config.name + bpt = self.model_config.kv_cache_size_per_token + + lines = [ + f"# KV Cache Benchmark; Traced Workload", + f"# Generated: {timestamp}", + f"# Model: {model_name}, Users: {self.num_users}, Duration: {self.duration}s", + f"# KV bytes/token: {bpt:,} bytes ({bpt/1024:.0f} KB)", + f"#", + f"# Distilled from bpftrace block-layer tracing during benchmark run.", + f"# Total traced I/Os: {total_io:,} ({read_count:,} reads, {write_count:,} writes)", + f"#", + f"# Usage:", + f"# fio --filename=/dev/nvmeXn1", + f"# fio --filename=/mnt/nvme/fio_test --size=100G", + f"", + f"[kv-cache-traced]", + f"ioengine=libaio", + f"direct=1", + f"time_based", + f"runtime=300", + f"rw=randrw", + f"rwmixread={rwmixread}", + f"bssplit={bssplit_line}", + f"iodepth={iodepth}", + f"iodepth_batch_submit={iodepth}", + f"iodepth_batch_complete_min=1", + f"size=100%", + ] + + if thinktime_us > 0: + lines.extend([ + f"thinktime={thinktime_us}", + f"thinktime_blocks={iodepth}", + ]) + if thinktime_iotime_us > 0: + # thinktime_iotime requires fio 3.28+ + # Converts active I/O period from microseconds to seconds + thinktime_iotime_s = max(1, thinktime_iotime_us // 1000000) + lines.append(f"# thinktime_iotime={thinktime_iotime_s}s # uncomment for fio 3.28+ (active I/O burst before idle)") + + lines.extend([ + f"refill_buffers=1", + f"norandommap=1", + f"randrepeat=0", + f"numjobs=1", + f"group_reporting", + f"percentile_list=50:95:99:99.9:99.99", + ]) + + return "\n".join(lines) + "\n" + + def _check_memory_safety(self): + """Estimate peak memory usage and warn if OOM is likely. + + Peak memory per worker thread comes from access_cache -> np.load + np.array + which briefly holds 2x the entry size in RAM. With N worker threads running + concurrently, peak = N * 2 * mean_entry_bytes + baseline (precomputed buffer, + Python/torch overhead, OS). + """ + try: + import psutil + available_ram = psutil.virtual_memory().available + except ImportError: + try: + with open('/proc/meminfo', 'r') as f: + for line in f: + if line.startswith('MemAvailable:'): + available_ram = int(line.split()[1]) * 1024 # kB to bytes + break + else: + return # can't determine + except (FileNotFoundError, ValueError): + return # non-Linux or parse error + + bpt = self.model_config.kv_cache_size_per_token + # Mean context from user templates: midpoint of chatbot + coding + document ranges + mean_context_tokens = 8000 # conservative estimate + mean_entry_bytes = mean_context_tokens * bpt + # np.load + np.array = 2x entry size per concurrent read + per_thread_peak = mean_entry_bytes * 2 + # Worker threads = min(num_users, 500); all can read concurrently + num_workers = min(self.num_users, 500) + # Baseline: precomputed buffer (~256 MB) + Python/torch overhead (~2 GB) + baseline = 2.5 * 1024**3 + estimated_peak = (num_workers * per_thread_peak) + baseline + + safe_workers = max(1, int((available_ram - baseline) / per_thread_peak)) if per_thread_peak > 0 else num_workers + + print(f"\n### MEMORY SAFETY CHECK ###") + print(f" Formula: peak = (workers x 2 x mean_entry_bytes) + baseline") + print(f" = ({num_workers} x 2 x {mean_entry_bytes / 1024**2:.0f} MB) + {baseline / 1024**3:.1f} GB") + print(f" = {estimated_peak / 1024**3:.1f} GB") + print(f" Available RAM: {available_ram / 1024**3:.1f} GB") + print(f" Mean entry size: {mean_entry_bytes / 1024**2:.0f} MB ({mean_context_tokens} tok x {bpt:,} B/tok)") + print(f" Peak per thread: {per_thread_peak / 1024**2:.0f} MB (np.load + np.array copy)") + print(f" Worker threads: {num_workers}") + print(f" Safe concurrent readers: ~{safe_workers} = (available - baseline) / peak_per_thread") + + if estimated_peak > available_ram * 0.85: + print(f" WARNING: Estimated peak memory ({estimated_peak / 1024**3:.1f} GB) exceeds 85% of") + print(f" available RAM ({available_ram / 1024**3:.1f} GB). Risk of OOM with {num_workers} workers.") + print(f" Consider: --num-users {min(safe_workers, self.num_users)} or --max-concurrent-allocs {max(1, safe_workers // 2)}") + else: + print(f" Status: OK") + def run(self) -> Dict: """The main entry point to start the benchmark execution.""" print(f"\nIntegrated Multi-User KV Cache Benchmark - MLPerf Edition") @@ -714,6 +1165,8 @@ def run(self) -> Dict: print(f" Conversations: {self.sharegpt_loader.token_stats.get('total_conversations', 0)}") print(f" Total Turns: {self.sharegpt_loader.token_stats.get('total_turns', 0)}") + self._check_memory_safety() + if self.precondition: self._run_preconditioning() @@ -727,6 +1180,9 @@ def run(self) -> Dict: mode_str = "PREFILL-ONLY (write-heavy, disaggregated prefill node)" elif self.decode_only: mode_str = "DECODE-ONLY (read-heavy, assumes KV cache pre-populated)" + if self.enable_latency_tracing: + self._start_latency_tracing() + print(f"\nStarting benchmark... Mode: {mode_str}") print("-" * 80) @@ -763,8 +1219,24 @@ def run(self) -> Dict: for thread in threads: thread.join(timeout=2.0) + # Stop tracing and collect results before stats calculation + trace_data = None + if self.enable_latency_tracing: + trace_data = self._stop_latency_tracing() + self._calculate_stats(actual_duration) + if trace_data: + self.results['device_latency_tracing'] = trace_data + self._print_trace_results(trace_data) + + fio_config = self._generate_fio_workload(trace_data) + if fio_config: + self.results['fio_workload'] = fio_config + print(f"\n### GENERATED FIO WORKLOAD ###") + for line in fio_config.strip().split('\n'): + print(f" {line}") + if self.validator: self.results['validation'] = self.validator.validate_benchmark(self.results) diff --git a/kv_cache_benchmark/kv_cache/cache.py b/kv_cache_benchmark/kv_cache/cache.py index e1d904ae..69621f8e 100755 --- a/kv_cache_benchmark/kv_cache/cache.py +++ b/kv_cache_benchmark/kv_cache/cache.py @@ -710,6 +710,22 @@ def _allocate_cache_inner(self, key: str, num_tokens: int, phase: InferencePhase del data return False, 'none', 0.0 + def check_cache_exists(self, key: str) -> Tuple[Optional[str], int]: + """Metadata-only existence check. No I/O, no LRU update. + + Used by multi-turn virtual context checks: determines whether a + previous turn's KV cache entry survived LRU eviction without + loading data from disk (no np.load, no memory allocation). + + Returns: + (location, size_bytes) if the entry exists, (None, 0) otherwise. + """ + with self.metadata_lock: + entry = self.cache_entries.get(key) + if entry is None: + return None, 0 + return entry['location'], entry['size'] + def access_cache(self, key: str, phase: InferencePhase = InferencePhase.DECODE, cache_type: str = 'user') -> Tuple[Optional[str], float]: """Accesses an existing cached entry and records the read performance.""" diff --git a/kv_cache_benchmark/kv_cache/cli.py b/kv_cache_benchmark/kv_cache/cli.py index 03864c3b..c1f8b63f 100755 --- a/kv_cache_benchmark/kv_cache/cli.py +++ b/kv_cache_benchmark/kv_cache/cli.py @@ -208,6 +208,78 @@ def get_nested(d, keys, default=None): qos_df = pd.DataFrame(qos_rows) qos_df.to_excel(writer, sheet_name='QoS Metrics', index=False) + # Device tracing sheet (when --enable-latency-tracing is used) + trace_data = results.get('device_latency_tracing', {}) + if trace_data: + trace_rows = [] + display_order = [ + ('d2c_read_us', 'D2C Read (us)', 'Device hardware time'), + ('d2c_write_us', 'D2C Write (us)', 'Device hardware time'), + ('q2d_read_us', 'Q2D Read (us)', 'I/O scheduler queue'), + ('q2d_write_us', 'Q2D Write (us)', 'I/O scheduler queue'), + ('vfs_read_us', 'VFS Read (us)', 'Application-visible'), + ('vfs_write_us', 'VFS Write (us)', 'Application-visible'), + ('fsync_us', 'fsync (us)', 'Device flush'), + ('write_to_fsync_us', 'Write-to-fsync (us)', 'CPU serialization gap'), + ('fadvise_to_read_us', 'fadvise-to-read (us)', 'Cache drop overhead'), + ('bssplit_read_kb', 'Block Size Read (KB)', 'I/O size distribution'), + ('bssplit_write_kb', 'Block Size Write (KB)', 'I/O size distribution'), + ('qd_read', 'Queue Depth Read', 'Instantaneous QD at dispatch'), + ('qd_write', 'Queue Depth Write', 'Instantaneous QD at dispatch'), + ('lba_read_gb', 'LBA Heatmap Read (GB)', 'Spatial I/O distribution'), + ('lba_write_gb', 'LBA Heatmap Write (GB)', 'Spatial I/O distribution'), + ] + + def hist_pct(buckets, pct): + total = sum(b['count'] for b in buckets) + if total == 0: + return 0 + target = total * pct / 100.0 + cum = 0 + for b in buckets: + cum += b['count'] + if cum >= target: + return b['range_us'][0] + return buckets[-1]['range_us'][0] + + for key, label, description in display_order: + if key not in trace_data or not trace_data[key].get('buckets'): + continue + buckets = trace_data[key]['buckets'] + total_count = sum(b['count'] for b in buckets) + if total_count == 0: + continue + trace_rows.append({ + 'Metric': label, + 'Description': description, + 'Samples': total_count, + 'P50': hist_pct(buckets, 50), + 'P95': hist_pct(buckets, 95), + 'P99': hist_pct(buckets, 99), + 'Min Bucket': buckets[0]['range_us'][0], + 'Max Bucket': buckets[-1]['range_us'][1], + }) + + if trace_rows: + trace_df = pd.DataFrame(trace_rows) + trace_df.to_excel(writer, sheet_name='Device Tracing', index=False) + + # Raw histograms sheet + raw_rows = [] + for key, label, _ in display_order: + if key not in trace_data or not trace_data[key].get('buckets'): + continue + for b in trace_data[key]['buckets']: + raw_rows.append({ + 'Histogram': label, + 'Bucket Low': b['range_us'][0], + 'Bucket High': b['range_us'][1], + 'Count': b['count'], + }) + if raw_rows: + raw_df = pd.DataFrame(raw_rows) + raw_df.to_excel(writer, sheet_name='Trace Histograms', index=False) + logger.info(f"XLSX results saved to {output_path}") else: csv_path = output_path.replace('.xlsx', '.csv') if output_path.endswith('.xlsx') else output_path @@ -299,6 +371,8 @@ def main(): help='Simulate disaggregated prefill node (write-heavy, no decode reads).') parser.add_argument('--decode-only', action='store_true', help='Simulate disaggregated decode node (read-heavy, assumes KV cache exists).') + parser.add_argument('--enable-latency-tracing', action='store_true', + help='Enable bpftrace device latency tracing (requires sudo, bpftrace).') args = parser.parse_args() @@ -377,7 +451,8 @@ def main(): trace_speedup=args.trace_speedup, replay_cycles=args.replay_cycles, prefill_only=args.prefill_only, - decode_only=args.decode_only + decode_only=args.decode_only, + enable_latency_tracing=args.enable_latency_tracing ) results = benchmark.run() @@ -401,6 +476,15 @@ def convert_numpy(obj): if args.xlsx_output: export_results_to_xlsx(results, args, args.xlsx_output) + # Save fio workload file when latency tracing produced one + fio_config = results.get('fio_workload') + if fio_config: + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + fio_filename = f"fio_kv_cache_workload_{timestamp}.ini" + with open(fio_filename, 'w') as f: + f.write(fio_config) + logger.info(f"fio workload saved to {fio_filename}") + if __name__ == "__main__": main() diff --git a/kv_cache_benchmark/tests/test_kv_cache.py b/kv_cache_benchmark/tests/test_kv_cache.py index df8d7909..baf67c22 100644 --- a/kv_cache_benchmark/tests/test_kv_cache.py +++ b/kv_cache_benchmark/tests/test_kv_cache.py @@ -3341,31 +3341,21 @@ def test_part3b_request_to_npy_file_mapping(self, tiny_model): def test_part3c_multi_turn_prefill_decode_file_io(self, tiny_model): """ - Shows how a multi-turn conversation creates and reads .npy files. + Shows how a multi-turn conversation reads ALL previous turns. - Conversation with 4 turns: + Each turn writes its own new tokens. On subsequent turns, ALL + previous turns are read via access_cache to reload the full + conversation context. Evicted turns return (None, 0.0) with + no I/O; surviving turns trigger real np.load reads. - Turn 1 (no previous context): - cache_key = "conv_XXX_turn_1" - PREFILL: allocate_cache() → WRITE conv_XXX_turn_1.npy (new file) - DECODE: access_cache() → READ conv_XXX_turn_1.npy + Turn 1: WRITE turn_1 + READ turn_1 (decode) + Turn 2: READ [turn_1] + WRITE turn_2 + READ turn_2 + Turn 3: READ [turn_1, turn_2] + WRITE turn_3 + READ turn_3 + Turn 4: READ [turn_1, turn_2, turn_3] + WRITE turn_4 + READ turn_4 - Turn 2 (has previous turn): - cache_key = "conv_XXX_turn_2" - MULTI-TURN READ: access_cache(turn_1) → READ conv_XXX_turn_1.npy ← reuse! - PREFILL: allocate_cache() → WRITE conv_XXX_turn_2.npy (new file) - DECODE: access_cache() → READ conv_XXX_turn_2.npy - - Turn 3: - MULTI-TURN READ: access_cache(turn_2) → READ conv_XXX_turn_2.npy ← reuse! - PREFILL: WRITE conv_XXX_turn_3.npy - DECODE: READ conv_XXX_turn_3.npy - - Each turn: - - Reads the PREVIOUS turn's .npy (multi-turn cache reuse) - - Writes a NEW .npy for this turn's KV cache - - Reads the NEW .npy during decode - - File count grows by 1 per turn (until eviction cleans old ones) + Multi-turn reads (triangular): 0+1+2+3 = 6 + Decode reads: 4 (one per turn) + Total decode_reads stat: 10 This is the exact flow from benchmark.py process_requests() steps 2, 3, 5. """ @@ -3411,25 +3401,21 @@ def test_part3c_multi_turn_prefill_decode_file_io(self, tiny_model): storage_latency = 0.0 file_ops = [] - # ── Step 2: Multi-turn read (previous turn's cache) ── + # ── Step 2: Multi-turn read (ALL previous turns) ── if turn > 1: - prev_key = f"{conv_id}_turn_{turn - 1}" - prev_file = nvme_dir / f"{prev_key}.npy" - - print(f"\n Step 2: MULTI-TURN READ (reuse previous turn)") - print(f" Read: {prev_key}.npy") - print(f" Exists: {prev_file.exists()}") - - location, read_lat = cache.access_cache( - prev_key, InferencePhase.DECODE, 'multi_turn' - ) - storage_latency += read_lat - file_ops.append(f"READ {prev_key}.npy ({read_lat*1000:.3f} ms) [multi-turn reuse]") - - if location: - print(f" Hit: location={location}, latency={read_lat*1000:.3f} ms") - else: - print(f" Miss: previous turn not in cache") + prev_keys = conv_mgr.get_all_previous_turn_keys(conv_id, turn) + print(f"\n Step 2: MULTI-TURN READ ({len(prev_keys)} previous turn(s))") + for prev_key in prev_keys: + location, read_lat = cache.access_cache( + prev_key, InferencePhase.DECODE, 'multi_turn' + ) + storage_latency += read_lat + if location: + file_ops.append(f"READ {prev_key}.npy ({read_lat*1000:.3f} ms) [hit, {location}]") + print(f" {prev_key} -> Hit ({location}, {read_lat*1000:.3f} ms)") + else: + file_ops.append(f"READ {prev_key}.npy [miss, evicted]") + print(f" {prev_key} -> Miss (evicted)") else: print(f"\n Step 2: MULTI-TURN READ — skipped (turn 1, no history)") @@ -3488,34 +3474,145 @@ def test_part3c_multi_turn_prefill_decode_file_io(self, tiny_model): print(f" Total write bytes: {cache.stats['total_write_bytes']/1024:.0f} KB") print(f" Total read bytes: {cache.stats['total_read_bytes']/1024:.0f} KB") - print(f"\n File-per-turn pattern:") - print(f" Turn 1: WRITE turn_1.npy + READ turn_1.npy") - print(f" Turn 2: READ turn_1.npy + WRITE turn_2.npy + READ turn_2.npy") - print(f" Turn 3: READ turn_2.npy + WRITE turn_3.npy + READ turn_3.npy") - print(f" Turn N: READ turn_(N-1).npy + WRITE turn_N.npy + READ turn_N.npy") + multi_turn_reads = (num_turns * (num_turns - 1)) // 2 + + print(f"\n Full-context reload pattern:") + print(f" Turn 1: WRITE turn_1 + READ turn_1") + print(f" Turn 2: READ [turn_1] + WRITE turn_2 + READ turn_2") + print(f" Turn 3: READ [turn_1, turn_2] + WRITE turn_3 + READ turn_3") + print(f" Turn N: READ [turns 1..N-1] + WRITE turn_N + READ turn_N") print(f"") print(f" I/O per turn:") print(f" Turn 1: 1 write + 1 read = 2 I/O ops") - print(f" Turn 2+: 1 write + 2 reads = 3 I/O ops (extra read = multi-turn reuse)") - print(f"") - print(f" Write amplification over {num_turns} turns:") - total_data = num_turns * context_per_turn * bpt - total_written = cache.stats['total_write_bytes'] - print(f" Unique KV data: {total_data/1024:.0f} KB " - f"({num_turns} turns × {context_per_turn} tok × {bpt:,d} B)") - print(f" Bytes written: {total_written/1024:.0f} KB") - print(f" Ratio: {total_written / total_data:.2f}x") + print(f" Turn N: 1 write + N reads = {num_turns + 1} I/O ops (turn {num_turns})") + print(f" Multi-turn reads (triangular): {multi_turn_reads}") # Assertions assert len(all_npy) == num_turns, \ f"Should have {num_turns} .npy files (one per turn), got {len(all_npy)}" assert cache.stats['prefill_writes'] == num_turns, \ f"Should have {num_turns} prefill writes" - # decode_reads: turn 1 has 1, turns 2-4 have 2 each (multi-turn + decode) - expected_reads = 1 + (num_turns - 1) * 2 + # decode_reads = multi-turn reads (triangular) + decode reads (one per turn) + expected_reads = num_turns + multi_turn_reads assert cache.stats['decode_reads'] == expected_reads, \ f"Expected {expected_reads} decode reads, got {cache.stats['decode_reads']}" + # ------------------------------------------------------------------ + # Part 3d: Multi-turn with eviction pressure + # ------------------------------------------------------------------ + + def test_part3d_multi_turn_with_eviction(self, tiny_model): + """ + Shows what happens when previous turns get evicted by the LRU + waterfall before the next turn reads them. + + Setup: NVMe capacity fits only 3 entries. A 6-turn conversation + forces turns 1-3 to be evicted as turns 4-6 are written. The + multi-turn reads for later turns show a mix of hits and misses. + + This validates that access_cache returns (None, 0.0) cleanly for + evicted entries, and that the benchmark correctly counts misses. + """ + print("\n" + "=" * 72) + print(" PART 3d: MULTI-TURN WITH EVICTION PRESSURE") + print("=" * 72) + + bpt = tiny_model.kv_cache_size_per_token + context_per_turn = 200 + entry_bytes = context_per_turn * bpt + # Capacity for ~3 entries; turns 4+ force eviction of oldest + capacity_bytes = entry_bytes * 3.5 + capacity_gb = capacity_bytes / (1024 ** 3) + + cache = MultiTierCache( + model_config=tiny_model, + gpu_memory_gb=0, + cpu_memory_gb=0, + seed=42, + storage_capacity_gb=capacity_gb, + ) + + nvme_dir = cache.backends['nvme'].base_path + conv_mgr = ConversationManager(max_conversations=10) + conv_id = conv_mgr.start_conversation("bob") + + num_turns = 6 + print(f"\n Entry size: {entry_bytes/1024:.0f} KB") + print(f" NVMe capacity: {capacity_bytes/1024:.0f} KB (~{capacity_bytes/entry_bytes:.1f} entries)") + print(f" Turns: {num_turns} (will force eviction after turn 3)") + + total_hits = 0 + total_misses = 0 + + for turn in range(1, num_turns + 1): + turn_num, cache_key = conv_mgr.add_turn(conv_id, context_per_turn, 50) + + print(f"\n Turn {turn}:") + + # Step 2: read all previous turns + if turn > 1: + prev_keys = conv_mgr.get_all_previous_turn_keys(conv_id, turn) + hits = 0 + misses = 0 + for prev_key in prev_keys: + location, _ = cache.access_cache(prev_key, InferencePhase.DECODE, 'multi_turn') + if location is not None: + hits += 1 + else: + misses += 1 + total_hits += hits + total_misses += misses + print(f" Step 2: read {len(prev_keys)} previous turns -> {hits} hits, {misses} misses") + + # Step 3: write this turn + success, tier, _ = cache.allocate_cache( + cache_key, num_tokens=context_per_turn, phase=InferencePhase.PREFILL + ) + npy_count = len(list(nvme_dir.glob("*.npy"))) + print(f" Step 3: write {cache_key} -> {tier}, files on disk: {npy_count}") + + npy_files = sorted(nvme_dir.glob("*.npy")) + print(f"\n {'=' * 64}") + print(f" EVICTION SUMMARY") + print(f" {'=' * 64}") + print(f" Multi-turn hits: {total_hits}") + print(f" Multi-turn misses: {total_misses}") + print(f" Files on disk: {len(npy_files)} (capacity held ~3)") + print(f" Evictions: {cache.stats['evictions']}") + print(f" Cache misses: {cache.stats['cache_misses']} (tier-level, all types)") + for f in npy_files: + print(f" {f.name}") + + print(f"\n HOW CACHE MISSES INCREMENT:") + print(f" ────────────────────────────") + print(f" Two separate counters track misses:") + print(f"") + print(f" 1. cache.stats['cache_misses'] (tier-level, inside access_cache):") + print(f" Increments when ANY access_cache() call finds the key missing") + print(f" from cache_entries. Covers all miss types: multi-turn, decode,") + print(f" prefix, RAG. Returns (None, 0.0) immediately; no I/O, no np.load.") + print(f"") + print(f" 2. results['multi_turn_cache_misses'] (benchmark-level, step 2 only):") + print(f" Increments when a multi-turn access_cache() returns None.") + print(f" This is a SUBSET of cache_misses; it counts only the misses") + print(f" from the step 2 previous-turn read loop.") + print(f"") + print(f" In this test: {cache.stats['cache_misses']} tier-level misses total,") + print(f" {total_misses} of which are multi-turn misses from evicted turns.") + print(f"") + print(f" A high multi-turn miss rate ({total_misses}/{total_hits + total_misses}" + f" = {total_misses*100/max(total_hits+total_misses,1):.0f}%) means the") + print(f" NVMe tier is too small to hold the full conversation history.") + print(f" The LRU waterfall evicts old turns before they can be reused.") + + # Assertions + assert total_misses > 0, "Eviction should cause some multi-turn misses" + assert total_hits > 0, "Recent turns should still be cached" + assert cache.stats['evictions'] > 0, "Eviction should have occurred" + assert cache.stats['cache_misses'] >= total_misses, \ + "Tier-level misses should be >= multi-turn misses (superset)" + assert len(npy_files) <= 4, "NVMe should hold ~3 entries, not all 6" + # ------------------------------------------------------------------ # Part 4: 3-tier waterfall LRU eviction # ------------------------------------------------------------------ diff --git a/kv_cache_benchmark/utils/distill_fio.py b/kv_cache_benchmark/utils/distill_fio.py new file mode 100755 index 00000000..09dd50e4 --- /dev/null +++ b/kv_cache_benchmark/utils/distill_fio.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python3 +""" +distill_fio.py — Convert bpftrace histogram output to a fio workload file. + +Parses the output of storage_latency_stack.bt and generates a standalone +fio .ini that reproduces the same I/O pattern: bssplit, read/write ratio, +queue depth, and idle time. + +Usage: + # Pipe from bpftrace: + sudo bpftrace storage_latency_stack.bt "vllm" 2>&1 | python3 distill_fio.py + + # From saved file: + python3 distill_fio.py < trace_output.txt + + # With custom output name: + python3 distill_fio.py -o vllm_workload.ini < trace_output.txt + + # Standalone (called by storage_latency_stack.sh --fio): + python3 distill_fio.py -i /tmp/trace_raw.txt -o fio_traced.ini --process vllm + +Author: Hazem Awadallah, Kingston Digital +License: Apache-2.0, donated to MLCommons +""" + +import re +import sys +import argparse +from datetime import datetime +from typing import Dict, List, Optional, Tuple + + +def parse_bpftrace_output(text: str) -> Dict: + """Parse bpftrace histogram output into structured dict.""" + result = {} + current_hist = None + + for line in text.split('\n'): + # Match histogram name: @histogram_name: + hist_match = re.match(r'^\s*@(\w+):\s*$', line) + if hist_match: + current_hist = hist_match.group(1) + result[current_hist] = [] + continue + + if current_hist is None: + continue + + # Match numeric bucket: [low, high) count |@@@@| + m = re.match(r'^\[(\d+),\s*(\d+)\)\s+(\d+)\s+\|', line) + if m: + result[current_hist].append({ + 'low': int(m.group(1)), + 'high': int(m.group(2)), + 'count': int(m.group(3)), + }) + continue + + # Match K/M bucket: [1K, 2K) count |@@@@| + m = re.match(r'^\[(\d+)([KM]),\s*(\d+)([KM])\)\s+(\d+)\s+\|', line) + if m: + def to_val(n, s): + v = int(n) + return v * 1024 if s == 'K' else v * 1048576 + result[current_hist].append({ + 'low': to_val(m.group(1), m.group(2)), + 'high': to_val(m.group(3), m.group(4)), + 'count': int(m.group(5)), + }) + + return result + + +def hist_percentile(buckets: List[Dict], pct: float) -> int: + """Return the lower bound of the bucket containing the given percentile.""" + total = sum(b['count'] for b in buckets) + if total == 0: + return 0 + target = total * pct / 100.0 + cumulative = 0 + for b in buckets: + cumulative += b['count'] + if cumulative >= target: + return b['low'] + return buckets[-1]['low'] if buckets else 0 + + +def hist_to_bssplit(buckets: List[Dict]) -> str: + """Convert a bpftrace size histogram (KB) to fio bssplit format.""" + total = sum(b['count'] for b in buckets) + if total == 0: + return "4k/100" + parts = [] + for b in buckets: + if b['count'] == 0: + continue + size_kb = b['low'] + pct = int(round(b['count'] * 100.0 / total)) + if pct == 0 and b['count'] > 0: + pct = 1 + if size_kb >= 1024: + size_str = f"{size_kb // 1024}m" + elif size_kb == 0: + continue # skip zero-size buckets + else: + size_str = f"{size_kb}k" + parts.append(f"{size_str}/{pct}") + return ":".join(parts) if parts else "4k/100" + + +def generate_fio(histograms: Dict, process_name: str = "") -> str: + """Generate a fio .ini config from parsed histograms.""" + + # ── bssplit ── + read_bs = histograms.get('bssplit_read_kb', []) + write_bs = histograms.get('bssplit_write_kb', []) + read_bssplit = hist_to_bssplit(read_bs) + write_bssplit = hist_to_bssplit(write_bs) + + # ── rwmixread ── + read_count = sum(b['count'] for b in read_bs) + write_count = sum(b['count'] for b in write_bs) + total_io = read_count + write_count + rwmixread = int(round(read_count * 100.0 / total_io)) if total_io > 0 else 50 + + # ── iodepth from QD histogram P50 ── + iodepth = 32 + for qd_key in ('qd_read', 'qd_write'): + buckets = histograms.get(qd_key, []) + if buckets: + candidate = max(1, hist_percentile(buckets, 50)) + iodepth = max(iodepth, candidate) + + # ── thinktime from write_to_fsync gap ── + thinktime_us = 0 + wt_buckets = histograms.get('write_to_fsync_us', []) + if sum(b['count'] for b in wt_buckets) >= 4: + thinktime_us = hist_percentile(wt_buckets, 50) + + # ── thinktime_iotime from fsync latency ── + thinktime_iotime_us = 0 + fs_buckets = histograms.get('fsync_us', []) + if sum(b['count'] for b in fs_buckets) >= 4: + thinktime_iotime_us = hist_percentile(fs_buckets, 50) + + # ── LBA summary ── + lba_summary = "" + for lba_key, direction in [('lba_read_gb', 'Read'), ('lba_write_gb', 'Write')]: + buckets = histograms.get(lba_key, []) + total = sum(b['count'] for b in buckets) + if total > 0: + hot = [b for b in buckets if b['count'] > total * 0.01] + if hot: + lba_summary += f"# {direction} hot zone: {hot[0]['low']}-{hot[-1]['high']} GB ({sum(b['count'] for b in hot)*100//total}% of I/O)\n" + + # ── D2C summary ── + d2c_summary = "" + for key, direction in [('d2c_read_us', 'Read'), ('d2c_write_us', 'Write')]: + buckets = histograms.get(key, []) + total = sum(b['count'] for b in buckets) + if total > 0: + p50 = hist_percentile(buckets, 50) + p99 = hist_percentile(buckets, 99) + d2c_summary += f"# D2C {direction}: P50={p50} us, P99={p99} us ({total} samples)\n" + + # ── Build config ── + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + job_name = process_name.replace('-', '_') if process_name else "traced" + + lines = [ + f"# Storage Latency Stack; Distilled fio Workload", + f"# Generated: {timestamp}", + f"# Source process: {process_name or '(all)'}", + f"# Total traced I/Os: {total_io:,} ({read_count:,} reads, {write_count:,} writes)", + f"#", + ] + if d2c_summary: + lines.append(f"# Device latency (D2C):") + lines.append(d2c_summary.rstrip()) + if lba_summary: + lines.append(f"# LBA spatial distribution:") + lines.append(lba_summary.rstrip()) + lines.extend([ + f"#", + f"# Usage:", + f"# fio --filename=/dev/nvmeXn1", + f"# fio --filename=/mnt/nvme/fio_test --size=100G", + f"", + f"[{job_name}_workload]", + f"ioengine=libaio", + f"direct=1", + f"time_based", + f"runtime=300", + f"rw=randrw", + f"rwmixread={rwmixread}", + f"bssplit={read_bssplit},{write_bssplit}", + f"iodepth={iodepth}", + f"iodepth_batch_submit={iodepth}", + f"iodepth_batch_complete_min=1", + f"size=100%", + ]) + + if thinktime_us > 0: + lines.append(f"thinktime={thinktime_us}") + lines.append(f"thinktime_blocks={iodepth}") + if thinktime_iotime_us > 0: + iotime_s = max(1, thinktime_iotime_us // 1000000) + lines.append(f"# thinktime_iotime={iotime_s}s # uncomment for fio 3.28+") + + lines.extend([ + f"refill_buffers=1", + f"norandommap=1", + f"randrepeat=0", + f"numjobs=1", + f"group_reporting", + f"percentile_list=50:95:99:99.9:99.99", + ]) + + return "\n".join(lines) + "\n" + + +def main(): + parser = argparse.ArgumentParser( + description='Distill bpftrace trace output into a fio workload file.' + ) + parser.add_argument('-i', '--input', default=None, + help='Input file (bpftrace output). Default: stdin') + parser.add_argument('-o', '--output', default=None, + help='Output fio .ini file. Default: fio_traced_TIMESTAMP.ini') + parser.add_argument('--process', default='', + help='Process name (for fio job naming and comments)') + parser.add_argument('--stdout', action='store_true', + help='Print fio config to stdout instead of file') + args = parser.parse_args() + + # Read input + if args.input: + with open(args.input, 'r') as f: + text = f.read() + else: + text = sys.stdin.read() + + if not text.strip(): + print("Error: empty input", file=sys.stderr) + sys.exit(1) + + # Parse + histograms = parse_bpftrace_output(text) + if not histograms: + print("Error: no histograms found in input", file=sys.stderr) + sys.exit(1) + + # Check for required histograms + has_bssplit = 'bssplit_read_kb' in histograms or 'bssplit_write_kb' in histograms + if not has_bssplit: + print("Error: no bssplit histograms found. Was storage_latency_stack.bt used?", file=sys.stderr) + sys.exit(1) + + # Generate + fio_config = generate_fio(histograms, args.process) + + if args.stdout: + print(fio_config) + else: + if args.output: + outpath = args.output + else: + ts = datetime.now().strftime('%Y%m%d_%H%M%S') + outpath = f"fio_traced_{ts}.ini" + with open(outpath, 'w') as f: + f.write(fio_config) + print(f"fio workload saved to {outpath}") + + +if __name__ == '__main__': + main() diff --git a/kv_cache_benchmark/utils/storage_latency_stack.bt b/kv_cache_benchmark/utils/storage_latency_stack.bt new file mode 100644 index 00000000..681be5cd --- /dev/null +++ b/kv_cache_benchmark/utils/storage_latency_stack.bt @@ -0,0 +1,317 @@ +#!/usr/bin/env bpftrace +/* + * storage_latency_stack.bt — Full-Stack Storage Latency Diagnostic Tool + * + * Decomposes storage I/O latency across every layer of the Linux I/O stack: + * + * Application (VFS) + * └── Filesystem / Page Cache + * └── Block Layer (I/O Scheduler) + * └── NVMe / SCSI Driver + * └── Physical Device + * + * Ten histograms produced on Ctrl-C: + * + * @q2d_read_us — Queue-to-Dispatch (I/O scheduler wait) for reads + * @q2d_write_us — Queue-to-Dispatch (I/O scheduler wait) for writes + * @d2c_read_us — Dispatch-to-Complete (actual device hardware time) for reads + * @d2c_write_us — Dispatch-to-Complete (actual device hardware time) for writes + * @vfs_read_us — Full VFS read latency (application-visible) + * @vfs_write_us — Full VFS write latency (application-visible) + * + * Plus block size, queue depth, and LBA distribution: + * + * @bssplit_read_kb — Block size distribution for reads (KB) + * @bssplit_write_kb — Block size distribution for writes (KB) + * @qd_read — Instantaneous queue depth at dispatch (reads) + * @qd_write — Instantaneous queue depth at dispatch (writes) + * @lba_read_gb — LBA heatmap for reads (10 GB linear buckets) + * @lba_write_gb — LBA heatmap for writes (10 GB linear buckets) + * + * Plus the write-path serialization gap: + * + * @write_to_fsync_us — Time between write() return and fsync() entry + * (captures CPU serialization, locks, GIL overhead) + * @fsync_us — Actual fsync() latency (device flush) + * + * Interpreting results: + * + * If D2C >> Q2D: Device is the bottleneck (slow NAND, MDTS splitting) + * If Q2D >> D2C: I/O scheduler contention (too many threads, bad scheduler) + * If VFS >> Q2D + D2C: Filesystem/page cache/serialization overhead dominates + * If write_to_fsync >> fsync: CPU serialization is the bottleneck, not storage + * + * Bimodal VFS patterns: + * + * Fast (µs) + Slow (ms) reads = page cache hits + device misses + * Fast (µs) + Slow (ms) writes = buffered writes + fsync flushes + * If ALL reads are fast (µs) = page cache absorbing everything — NOT testing storage! + * + * Usage: + * + * # Trace all I/O to all devices, filter VFS to specific process: + * sudo BPFTRACE_COMM="kv-cache" bpftrace storage_latency_stack.bt + * + * # Trace a Python script: + * sudo BPFTRACE_COMM="python3" bpftrace storage_latency_stack.bt + * + * # Trace fio: + * sudo BPFTRACE_COMM="fio" bpftrace storage_latency_stack.bt + * + * # Trace all processes (caution: noisy): + * sudo BPFTRACE_COMM="" bpftrace storage_latency_stack.bt + * + * Optional device filter (block layer only): + * + * # Get device number: lsblk -no MAJ:MIN /dev/nvme5n1 → 259:5 + * # Convert: python3 -c "print(hex(259 << 20 | 5))" → 0x10300005 + * sudo BPFTRACE_COMM="kv-cache" BPFTRACE_DEV=0x10300005 bpftrace storage_latency_stack.bt + * + * Requirements: + * - Linux 5.x+ with BTF support + * - bpftrace 0.16+ (sudo apt install bpftrace) + * - CAP_BPF or root + * + * Author: Hazem Awadallah, Kingston Digital + * License: Apache-2.0, donated to MLCommons + */ + +/* ==================================================================== + * BLOCK LAYER: Queue → Dispatch → Complete + * + * These tracepoints fire for ALL block I/O on the system (or filtered + * by device). They measure the kernel block layer, below the filesystem + * and above the device driver. + * + * Timeline per I/O request: + * + * block_rq_insert ─── Q2D ─── block_rq_issue ─── D2C ─── block_rq_complete + * (entered I/O (dispatched to (device finished) + * scheduler queue) NVMe/SCSI driver) + * + * Q2D = time waiting in I/O scheduler (noop/mq-deadline/kyber/bfq) + * D2C = time the physical device took to complete the command + * + * For NVMe, D2C includes: + * - NVMe submission queue → controller processing → completion queue + * - For PRP drives: host PRP list construction overhead + * - For drives with small MDTS: each split command measured individually + * + * ==================================================================== */ + +tracepoint:block:block_rq_insert +{ + @q[args->dev, args->sector] = nsecs; +} + +/* Handle device-mapper and md remaps — follow the I/O to its final device */ +tracepoint:block:block_rq_remap +{ + if (@q[args->old_dev, args->old_sector]) { + @q[args->dev, args->sector] = @q[args->old_dev, args->old_sector]; + delete(@q[args->old_dev, args->old_sector]); + } +} + +/* I/O dispatched to device driver — start of D2C. + * + * NVMe with blk-mq and 'none' scheduler uses direct dispatch: + * I/Os go straight to block_rq_issue without block_rq_insert. + * D2C, bssplit, and QD are measured unconditionally. + * Q2D is measured separately only when block_rq_insert fired. + * + * Split into two probes to stay within bpftrace 0.14 verifier limits. */ + +/* Q2D: only fires when block_rq_insert populated @q */ +tracepoint:block:block_rq_issue +/@q[args->dev, args->sector]/ +{ + $lat = (nsecs - @q[args->dev, args->sector]) / 1000; + if (((uint8 *)args->rwbs)[0] == 82) { + @q2d_read_us = hist($lat); + } else if (((uint8 *)args->rwbs)[0] == 87) { + @q2d_write_us = hist($lat); + } + delete(@q[args->dev, args->sector]); +} + +/* D2C start + bssplit + QD: fires on ALL dispatched I/Os */ +tracepoint:block:block_rq_issue +{ + @d[args->dev, args->sector] = nsecs; + + if (((uint8 *)args->rwbs)[0] == 82) { + @bssplit_read_kb = hist(args->bytes / 1024); + @inflight_read++; + @qd_read = hist(@inflight_read); + } else if (((uint8 *)args->rwbs)[0] == 87) { + @bssplit_write_kb = hist(args->bytes / 1024); + @inflight_write++; + @qd_write = hist(@inflight_write); + } +} + +/* LBA heatmap: where on the device is I/O landing? + * Sector >> 21 converts 512-byte sectors to GB offset. + * lhist gives linear 10 GB buckets up to 8 TB — shows spatial I/O pattern. + * Clustered = narrow band; random = spread; sequential = progression. */ +tracepoint:block:block_rq_issue +{ + $gb = args->sector >> 21; + if (((uint8 *)args->rwbs)[0] == 82) { + @lba_read_gb = lhist($gb, 0, 8000, 10); + } else if (((uint8 *)args->rwbs)[0] == 87) { + @lba_write_gb = lhist($gb, 0, 8000, 10); + } +} + +/* I/O completed by device — end of D2C, decrement in-flight counters */ +tracepoint:block:block_rq_complete +{ + if (@d[args->dev, args->sector]) { + $lat = (nsecs - @d[args->dev, args->sector]) / 1000; + + if (((uint8 *)args->rwbs)[0] == 82) { + @d2c_read_us = hist($lat); + @inflight_read--; + } else if (((uint8 *)args->rwbs)[0] == 87) { + @d2c_write_us = hist($lat); + @inflight_write--; + } + + delete(@d[args->dev, args->sector]); + } +} + + +/* ==================================================================== + * VFS LAYER: Application-visible read/write latency + * + * Measures the full syscall time from the application's perspective. + * Includes: page cache lookup, filesystem journaling, block I/O, + * and data copy to/from userspace. + * + * Bimodal pattern expected under correct benchmark behavior: + * - Fast peak (µs): page cache hits (buffered writes, cached reads) + * - Slow peak (ms): actual device I/O (fsync flushes, cache misses) + * + * If the slow peak is MISSING, the benchmark is not hitting storage! + * Check that posix_fadvise(DONTNEED) or O_DIRECT is being used. + * + * Filtered by process name via BPFTRACE_COMM environment variable. + * ==================================================================== */ + +kprobe:vfs_read +/comm == str($1) || str($1) == ""/ +{ + @rs[tid] = nsecs; +} + +kretprobe:vfs_read +/@rs[tid]/ +{ + @vfs_read_us = hist((nsecs - @rs[tid]) / 1000); + delete(@rs[tid]); +} + +kprobe:vfs_write +/comm == str($1) || str($1) == ""/ +{ + @ws[tid] = nsecs; +} + +kretprobe:vfs_write +/@ws[tid]/ +{ + $lat = (nsecs - @ws[tid]) / 1000; + @vfs_write_us = hist($lat); + delete(@ws[tid]); +} + + +/* ==================================================================== + * SERIALIZATION GAP: write() → fsync() + * + * Measures the CPU time between a write() returning and the next + * fsync() call on the same thread. This gap captures: + * + * - NumPy/Python serialization overhead + * - Lock acquisition and GIL contention + * - Memory allocation and buffer management + * - Application-level bookkeeping (stats, logging) + * + * In KV cache workloads, this is "host serialization latency" — + * typically 30-50% of total per-request time. + * + * Also measures fsync() itself — the actual device flush. + * + * Combined interpretation: + * write_to_fsync + fsync ≈ total per-entry write cost + * If write_to_fsync >> fsync: CPU-bound, faster storage won't help + * If fsync >> write_to_fsync: device-bound, storage upgrade helps + * ==================================================================== */ + +tracepoint:syscalls:sys_enter_write +/comm == str($1) || str($1) == ""/ +{ + @write_enter[tid] = nsecs; +} + +tracepoint:syscalls:sys_exit_write +/@write_enter[tid]/ +{ + @write_done[tid] = nsecs; + delete(@write_enter[tid]); +} + +tracepoint:syscalls:sys_enter_fsync +/comm == str($1) || str($1) == ""/ +{ + /* Measure gap from last write() completion to this fsync() entry */ + if (@write_done[tid]) { + @write_to_fsync_us = hist((nsecs - @write_done[tid]) / 1000); + delete(@write_done[tid]); + } + @fsync_enter[tid] = nsecs; +} + +tracepoint:syscalls:sys_exit_fsync +/@fsync_enter[tid]/ +{ + @fsync_us = hist((nsecs - @fsync_enter[tid]) / 1000); + delete(@fsync_enter[tid]); +} + + +/* ==================================================================== + * READ PATH: fadvise → read gap (optional, for cache invalidation) + * + * If the benchmark uses posix_fadvise(DONTNEED) to drop page cache + * before reads, this measures the gap. A non-zero gap means the + * application is doing work between cache invalidation and read. + * ==================================================================== */ + +tracepoint:syscalls:sys_exit_fadvise64 +/comm == str($1) || str($1) == ""/ +{ + @fadvise_done[tid] = nsecs; +} + +tracepoint:syscalls:sys_enter_read +/comm == str($1) || str($1) == "" && @fadvise_done[tid]/ +{ + if (@fadvise_done[tid]) { + @fadvise_to_read_us = hist((nsecs - @fadvise_done[tid]) / 1000); + delete(@fadvise_done[tid]); + } +} + + +/* ==================================================================== + * CLEANUP: Remove incomplete entries on exit + * ==================================================================== */ + +/* END block removed for bpftrace 0.14 compatibility. + * bpftrace automatically prints all maps on SIGINT. + * The @q, @d, @rs etc. maps will appear as empty hashes in output; + * the parser ignores them (no histogram buckets). */ diff --git a/kv_cache_benchmark/utils/storage_latency_stack.sh b/kv_cache_benchmark/utils/storage_latency_stack.sh new file mode 100755 index 00000000..c04aeea4 --- /dev/null +++ b/kv_cache_benchmark/utils/storage_latency_stack.sh @@ -0,0 +1,107 @@ +#!/bin/bash +# storage_latency_stack.sh — Full-Stack Storage Latency Diagnostic +# Author: Hazem Awadallah, Kingston Digital +# License: Apache-2.0, donated to MLCommons +# +# Decomposes I/O latency across every layer of the Linux storage stack: +# VFS → Filesystem → Block Layer (Q2D) → Device (D2C) +# +# Plus serialization gap analysis (write→fsync) for CPU vs device bottleneck, +# block size distribution (bssplit), queue depth, and LBA heatmap. +# +# Usage: +# sudo ./storage_latency_stack.sh kv-cache # trace kv-cache process +# sudo ./storage_latency_stack.sh vllm # trace vllm +# sudo ./storage_latency_stack.sh python3 # trace python3 +# sudo ./storage_latency_stack.sh "" # trace all (noisy) +# +# sudo ./storage_latency_stack.sh vllm --fio # trace + generate fio workload +# sudo ./storage_latency_stack.sh llm-d --fio # trace llm-d + generate fio +# +# Output (on Ctrl-C): +# Histograms printed to stdout. +# With --fio: also generates a fio .ini workload file from the trace data. +# +# Diagnosing: +# D2C >> Q2D → Device bottleneck (NAND, MDTS, interface) +# Q2D >> D2C → I/O scheduler contention +# VFS >> Q2D + D2C → Filesystem/serialization overhead +# write_to_fsync >> fsync → CPU-bound, faster storage won't help +# fsync >> write_to_fsync → Device-bound, storage upgrade helps +# All VFS reads in us → Page cache hit, NOT testing storage! + +set -euo pipefail + +COMM="" +GEN_FIO=0 + +# Parse arguments +for arg in "$@"; do + if [ "$arg" = "--fio" ]; then + GEN_FIO=1 + elif [ -z "$COMM" ]; then + COMM="$arg" + fi +done + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +BT_SCRIPT="${SCRIPT_DIR}/storage_latency_stack.bt" +DISTILL_SCRIPT="${SCRIPT_DIR}/distill_fio.py" + +if [ ! -f "$BT_SCRIPT" ]; then + echo "Error: $BT_SCRIPT not found" >&2 + exit 1 +fi + +if [ "$(id -u)" -ne 0 ]; then + echo "Error: must run as root (sudo)" >&2 + exit 1 +fi + +if ! command -v bpftrace &>/dev/null; then + echo "Error: bpftrace not found. Install: sudo apt install bpftrace" >&2 + exit 1 +fi + +if [ -n "$COMM" ]; then + echo "Tracing process: $COMM" + echo "Block layer: all devices (unfiltered)" +else + echo "Tracing ALL processes (block + VFS)" + echo "Warning: this will be noisy. Consider filtering by process name." +fi + +if [ "$GEN_FIO" -eq 1 ]; then + echo "fio generation: enabled (will save .ini after Ctrl-C)" +fi + +echo "Press Ctrl-C to stop and print histograms." +echo "---" + +if [ "$GEN_FIO" -eq 1 ]; then + # Capture bpftrace output to a temp file + TRACE_OUTPUT=$(mktemp /tmp/bpftrace_trace_XXXXXXXX.txt) + + # Run bpftrace; redirect all output to file + # bpftrace prints histograms on SIGINT before exiting + bpftrace "$BT_SCRIPT" "$COMM" > "$TRACE_OUTPUT" 2>&1 || true + + # Display the histograms + cat "$TRACE_OUTPUT" + + echo "" + echo "=== Distilling fio workload from trace data ===" + + if [ -f "$DISTILL_SCRIPT" ] && command -v python3 &>/dev/null; then + python3 "$DISTILL_SCRIPT" --input "$TRACE_OUTPUT" --process "$COMM" + else + echo "Warning: python3 or distill_fio.py not found. Skipping fio generation." + echo "Trace data saved to: $TRACE_OUTPUT" + echo "Manual: python3 distill_fio.py -i $TRACE_OUTPUT --process $COMM" + exit 0 + fi + + rm -f "$TRACE_OUTPUT" +else + exec bpftrace "$BT_SCRIPT" "$COMM" +fi