From 97e6fca1729cc7dcaad2b82698730cf991e241a9 Mon Sep 17 00:00:00 2001 From: Sean Koval Date: Sat, 14 Feb 2026 03:43:03 -0500 Subject: [PATCH] bench(py): add per-function data-processing metrics and baseline workflow --- README.md | 3 + benchmarks/data_processing/README.md | 38 +++ benchmarks/data_processing/latest.json | 75 +++++ docs/python_bindings.md | 1 + justfile | 6 + .../benchmarks/benchmark_data_processing.py | 306 +++++++++++++++--- 6 files changed, 386 insertions(+), 43 deletions(-) create mode 100644 benchmarks/data_processing/README.md create mode 100644 benchmarks/data_processing/latest.json diff --git a/README.md b/README.md index 27d0dd3..58b1740 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,9 @@ cargo run -p openquant --example research_notebook_smoke # Python pipeline micro-benchmark (for speed demos) uv run --python .venv/bin/python python python/benchmarks/benchmark_pipeline.py --iterations 30 --bars 2048 + +# Python data-processing benchmark (per-function throughput/latency + JSON output) +uv run --python .venv/bin/python python python/benchmarks/benchmark_data_processing.py --rows-per-symbol 200000 --symbols 4 --iterations 7 --out benchmarks/data_processing/latest.json ``` ## Crate Layout diff --git a/benchmarks/data_processing/README.md b/benchmarks/data_processing/README.md new file mode 100644 index 0000000..b5688e9 --- /dev/null +++ b/benchmarks/data_processing/README.md @@ -0,0 +1,38 @@ +# Data Processing Benchmarks + +This folder tracks Python-facing `openquant.data` performance metrics. + +## Files + +- `latest.json`: most recent benchmark run output. +- `baseline.json`: pinned comparison baseline for regression checks. + +## Generate Latest Metrics + +```bash +just py-bench-data +``` + +or directly: + +```bash +uv run --python .venv/bin/python python python/benchmarks/benchmark_data_processing.py \ + --rows-per-symbol 200000 \ + --symbols 4 \ + --iterations 7 \ + --out benchmarks/data_processing/latest.json +``` + +## Compare Against Baseline + +```bash +just py-bench-data-compare +``` + +## Refresh Baseline + +After reviewing and accepting new performance: + +```bash +cp benchmarks/data_processing/latest.json benchmarks/data_processing/baseline.json +``` diff --git a/benchmarks/data_processing/latest.json b/benchmarks/data_processing/latest.json new file mode 100644 index 0000000..3a3d6ec --- /dev/null +++ b/benchmarks/data_processing/latest.json @@ -0,0 +1,75 @@ +{ + "dataset": { + "rows_per_symbol": 100000, + "symbols": 4, + "total_rows": 400000, + "estimated_bytes": 20800000 + }, + "iterations": 5, + "functions": [ + { + "name": "clean_ohlcv", + "rows": 400000, + "bytes_estimate": 20800000, + "iterations": 5, + "mean_ms": 431.8176897984813, + "p50_ms": 427.49624899443006, + "p95_ms": 430.2287489990704, + "std_ms": 8.461222812078079, + "rows_per_sec": 926316.8449321059, + "mb_per_sec": 48.168475936469505 + }, + { + "name": "data_quality_report", + "rows": 400000, + "bytes_estimate": 20800000, + "iterations": 5, + "mean_ms": 202.2392454004148, + "p50_ms": 202.07314000435872, + "p95_ms": 205.02412800124148, + "std_ms": 2.602492864446543, + "rows_per_sec": 1977855.481056792, + "mb_per_sec": 102.84848501495318 + }, + { + "name": "align_calendar", + "rows": 400000, + "bytes_estimate": 24000000, + "iterations": 5, + "mean_ms": 487.6979159977054, + "p50_ms": 484.19221499352716, + "p95_ms": 485.0531679985579, + "std_ms": 8.35346004858996, + "rows_per_sec": 820179.8426423499, + "mb_per_sec": 49.21079055854099 + }, + { + "name": "load_ohlcv_csv", + "rows": 400000, + "bytes_estimate": 28228821, + "iterations": 5, + "mean_ms": 234.64987420011312, + "p50_ms": 232.7873370013549, + "p95_ms": 235.60187700059032, + "std_ms": 6.00703020230825, + "rows_per_sec": 1704667.4385125546, + "mb_per_sec": 120.30187996574853 + }, + { + "name": "load_ohlcv_parquet", + "rows": 400000, + "bytes_estimate": 2243334, + "iterations": 5, + "mean_ms": 458.6696180005674, + "p50_ms": 461.2510000006296, + "p95_ms": 468.5403839976061, + "std_ms": 11.962792657264071, + "rows_per_sec": 872087.4117271599, + "mb_per_sec": 4.890958354248841 + } + ], + "tables": { + "core": "name rows mean_ms p95_ms rows/s MB/s\n------------------------------------------------------------------------------------\nclean_ohlcv 400,000 431.82 430.23 926.32K/s 48.2\ndata_quality_report 400,000 202.24 205.02 1.98M/s 102.8\nalign_calendar 400,000 487.70 485.05 820.18K/s 49.2", + "io": "name rows mean_ms p95_ms rows/s MB/s\n------------------------------------------------------------------------------------\nload_ohlcv_csv 400,000 234.65 235.60 1.70M/s 120.3\nload_ohlcv_parquet 400,000 458.67 468.54 872.09K/s 4.9" + } +} \ No newline at end of file diff --git a/docs/python_bindings.md b/docs/python_bindings.md index 354d43b..1241e78 100644 --- a/docs/python_bindings.md +++ b/docs/python_bindings.md @@ -22,6 +22,7 @@ Quick performance showcase: ```bash uv run --python .venv/bin/python python python/benchmarks/benchmark_pipeline.py --iterations 30 --bars 2048 +uv run --python .venv/bin/python python python/benchmarks/benchmark_data_processing.py --rows-per-symbol 200000 --symbols 4 --iterations 7 --out benchmarks/data_processing/latest.json ``` Build a wheel: diff --git a/justfile b/justfile index e576e48..20fdd60 100644 --- a/justfile +++ b/justfile @@ -65,6 +65,12 @@ py-setup: py-bench: uv run --python .venv/bin/python python python/benchmarks/benchmark_pipeline.py --iterations 30 --bars 2048 +py-bench-data: + uv run --python .venv/bin/python python python/benchmarks/benchmark_data_processing.py --rows-per-symbol 200000 --symbols 4 --iterations 7 --out benchmarks/data_processing/latest.json + +py-bench-data-compare: + uv run --python .venv/bin/python python python/benchmarks/benchmark_data_processing.py --rows-per-symbol 200000 --symbols 4 --iterations 7 --out benchmarks/data_processing/latest.json --baseline benchmarks/data_processing/baseline.json + exp-run: uv run --python .venv/bin/python python experiments/run_pipeline.py --config experiments/configs/futures_oil_baseline.toml --out experiments/artifacts diff --git a/python/benchmarks/benchmark_data_processing.py b/python/benchmarks/benchmark_data_processing.py index 03c1d40..c5301dc 100644 --- a/python/benchmarks/benchmark_data_processing.py +++ b/python/benchmarks/benchmark_data_processing.py @@ -2,25 +2,161 @@ import argparse import json +import statistics import time +from dataclasses import dataclass +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import Any, Callable import polars as pl import openquant +@dataclass(frozen=True) +class BenchStats: + name: str + rows: int + bytes_estimate: int + iterations: int + mean_ms: float + p50_ms: float + p95_ms: float + std_ms: float + rows_per_sec: float + mb_per_sec: float + + def as_dict(self) -> dict[str, Any]: + return { + "name": self.name, + "rows": self.rows, + "bytes_estimate": self.bytes_estimate, + "iterations": self.iterations, + "mean_ms": self.mean_ms, + "p50_ms": self.p50_ms, + "p95_ms": self.p95_ms, + "std_ms": self.std_ms, + "rows_per_sec": self.rows_per_sec, + "mb_per_sec": self.mb_per_sec, + } + + +def _estimate_df_bytes(df: pl.DataFrame) -> int: + if df.height == 0: + return 0 + # Keep this cheap and deterministic for repeated benchmark runs. + return int(df.estimated_size()) + + +def _measure( + name: str, + rows: int, + bytes_estimate: int, + iterations: int, + fn: Callable[[], Any], +) -> BenchStats: + timings: list[float] = [] + for _ in range(iterations): + t0 = time.perf_counter() + fn() + timings.append(time.perf_counter() - t0) + timings.sort() + mean_s = statistics.mean(timings) + p50_s = timings[len(timings) // 2] + p95_s = timings[max(int(len(timings) * 0.95) - 1, 0)] + std_s = statistics.pstdev(timings) if len(timings) > 1 else 0.0 + return BenchStats( + name=name, + rows=rows, + bytes_estimate=bytes_estimate, + iterations=iterations, + mean_ms=mean_s * 1000.0, + p50_ms=p50_s * 1000.0, + p95_ms=p95_s * 1000.0, + std_ms=std_s * 1000.0, + rows_per_sec=(rows / mean_s) if mean_s > 0 else 0.0, + mb_per_sec=((bytes_estimate / 1_000_000.0) / mean_s) if mean_s > 0 else 0.0, + ) + + +def _format_rate(v: float) -> str: + if v >= 1_000_000_000: + return f"{v / 1_000_000_000:.2f}B/s" + if v >= 1_000_000: + return f"{v / 1_000_000:.2f}M/s" + if v >= 1_000: + return f"{v / 1_000:.2f}K/s" + return f"{v:.2f}/s" + + +def _format_table(stats: list[BenchStats]) -> str: + header = ( + "name".ljust(22) + + "rows".rjust(12) + + "mean_ms".rjust(12) + + "p95_ms".rjust(12) + + "rows/s".rjust(14) + + "MB/s".rjust(12) + ) + lines = [header, "-" * len(header)] + for s in stats: + lines.append( + s.name.ljust(22) + + f"{s.rows:,}".rjust(12) + + f"{s.mean_ms:,.2f}".rjust(12) + + f"{s.p95_ms:,.2f}".rjust(12) + + _format_rate(s.rows_per_sec).rjust(14) + + f"{s.mb_per_sec:,.1f}".rjust(12) + ) + return "\n".join(lines) + + +def _compare_against_baseline( + current: list[BenchStats], + baseline_path: Path, +) -> list[dict[str, Any]]: + if not baseline_path.exists(): + return [] + baseline = json.loads(baseline_path.read_text(encoding="utf-8")) + base_map = { + item["name"]: item + for item in baseline.get("functions", []) + if isinstance(item, dict) and "name" in item + } + rows: list[dict[str, Any]] = [] + for cur in current: + prev = base_map.get(cur.name) + if not prev: + continue + prev_mean = float(prev.get("mean_ms", 0.0)) + if prev_mean <= 0: + continue + regression_pct = ((cur.mean_ms - prev_mean) / prev_mean) * 100.0 + rows.append( + { + "name": cur.name, + "baseline_mean_ms": prev_mean, + "current_mean_ms": cur.mean_ms, + "regression_pct": regression_pct, + } + ) + return rows + + def make_dataset(rows_per_symbol: int, symbols: list[str]) -> pl.DataFrame: frames: list[pl.DataFrame] = [] + ts_base = pl.datetime_range( + start=pl.datetime(2020, 1, 1), + end=pl.datetime(2020, 1, 1) + pl.duration(minutes=rows_per_symbol - 1), + interval="1m", + eager=True, + ) for symbol in symbols: frames.append( pl.DataFrame( { - "ts": pl.datetime_range( - start=pl.datetime(2020, 1, 1), - end=pl.datetime(2020, 1, 1) + pl.duration(minutes=rows_per_symbol - 1), - interval="1m", - eager=True, - ), + "ts": ts_base, "symbol": [symbol] * rows_per_symbol, "open": pl.arange(0, rows_per_symbol, eager=True).cast(pl.Float64) + 100.0, "high": pl.arange(0, rows_per_symbol, eager=True).cast(pl.Float64) + 100.5, @@ -33,54 +169,138 @@ def make_dataset(rows_per_symbol: int, symbols: list[str]) -> pl.DataFrame: return pl.concat(frames, rechunk=True) -def main() -> None: - parser = argparse.ArgumentParser(description="Benchmark openquant.data throughput.") - parser.add_argument("--rows-per-symbol", type=int, default=200_000) - parser.add_argument("--symbols", type=int, default=4) - args = parser.parse_args() - - symbol_names = [f"SYM{i}" for i in range(args.symbols)] - base = make_dataset(args.rows_per_symbol, symbol_names) +def run_benchmarks(rows_per_symbol: int, symbols: int, iterations: int) -> dict[str, Any]: + symbol_names = [f"SYM{i}" for i in range(symbols)] + base = make_dataset(rows_per_symbol, symbol_names) total_rows = base.height + base_bytes = _estimate_df_bytes(base) # Warm-up to stabilize lazy-plan compile and allocation effects. - _ = openquant.data.clean_ohlcv(base) + clean_warm = openquant.data.clean_ohlcv(base) + _ = openquant.data.data_quality_report(base) + _ = openquant.data.align_calendar(clean_warm, interval="1m") - t0 = time.perf_counter() clean = openquant.data.clean_ohlcv(base) - t1 = time.perf_counter() + clean_rows = clean.height + clean_bytes = _estimate_df_bytes(clean) - t2 = time.perf_counter() - quality = openquant.data.data_quality_report(base) - t3 = time.perf_counter() + function_stats: list[BenchStats] = [ + _measure( + name="clean_ohlcv", + rows=total_rows, + bytes_estimate=base_bytes, + iterations=iterations, + fn=lambda: openquant.data.clean_ohlcv(base), + ), + _measure( + name="data_quality_report", + rows=total_rows, + bytes_estimate=base_bytes, + iterations=iterations, + fn=lambda: openquant.data.data_quality_report(base), + ), + _measure( + name="align_calendar", + rows=clean_rows, + bytes_estimate=clean_bytes, + iterations=iterations, + fn=lambda: openquant.data.align_calendar(clean, interval="1m"), + ), + ] - t4 = time.perf_counter() - aligned = openquant.data.align_calendar(clean, interval="1m") - t5 = time.perf_counter() + io_stats: list[BenchStats] = [] + with TemporaryDirectory(prefix="openquant_bench_") as tmp: + tmp_dir = Path(tmp) + csv_path = tmp_dir / "bench_ohlcv.csv" + pq_path = tmp_dir / "bench_ohlcv.parquet" + base.write_csv(csv_path) + base.write_parquet(pq_path) - print( - json.dumps( - { - "rows": total_rows, - "clean_rows": clean.height, - "aligned_rows": aligned.height, - "clean_seconds": t1 - t0, - "clean_rows_per_sec": total_rows / max(t1 - t0, 1e-9), - "quality_seconds": t3 - t2, - "quality_rows_per_sec": total_rows / max(t3 - t2, 1e-9), - "align_seconds": t5 - t4, - "align_rows_per_sec": clean.height / max(t5 - t4, 1e-9), - "quality_report": { - "row_count": quality["row_count"], - "symbol_count": quality["symbol_count"], - "duplicate_key_count": quality["duplicate_key_count"], - "gap_interval_count": quality["gap_interval_count"], - }, - }, - indent=2, + # Warm-up file paths. + _ = openquant.data.load_ohlcv(csv_path) + _ = openquant.data.load_ohlcv(pq_path) + + io_stats.append( + _measure( + name="load_ohlcv_csv", + rows=total_rows, + bytes_estimate=csv_path.stat().st_size, + iterations=iterations, + fn=lambda: openquant.data.load_ohlcv(csv_path), + ) ) + io_stats.append( + _measure( + name="load_ohlcv_parquet", + rows=total_rows, + bytes_estimate=pq_path.stat().st_size, + iterations=iterations, + fn=lambda: openquant.data.load_ohlcv(pq_path), + ) + ) + + all_stats = function_stats + io_stats + return { + "dataset": { + "rows_per_symbol": rows_per_symbol, + "symbols": symbols, + "total_rows": total_rows, + "estimated_bytes": base_bytes, + }, + "iterations": iterations, + "functions": [s.as_dict() for s in all_stats], + "tables": { + "core": _format_table(function_stats), + "io": _format_table(io_stats), + }, + } + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Benchmark openquant.data with per-function throughput and latency metrics." + ) + parser.add_argument("--rows-per-symbol", type=int, default=200_000) + parser.add_argument("--symbols", type=int, default=4) + parser.add_argument("--iterations", type=int, default=7) + parser.add_argument("--out", type=Path, default=None) + parser.add_argument("--baseline", type=Path, default=None) + args = parser.parse_args() + + result = run_benchmarks( + rows_per_symbol=args.rows_per_symbol, + symbols=args.symbols, + iterations=args.iterations, ) + print("Core Data Functions") + print(result["tables"]["core"]) + print() + print("I/O + End-to-End Load") + print(result["tables"]["io"]) + print() + + if args.baseline is not None: + diff_rows = _compare_against_baseline( + [BenchStats(**row) for row in result["functions"]], + args.baseline, + ) + result["baseline_comparison"] = diff_rows + if diff_rows: + print("Baseline Comparison (mean_ms regression; negative is faster)") + for row in diff_rows: + print( + f"{row['name']}: baseline={row['baseline_mean_ms']:.2f}ms " + f"current={row['current_mean_ms']:.2f}ms " + f"regression_pct={row['regression_pct']:+.2f}%" + ) + print() + + if args.out is not None: + args.out.parent.mkdir(parents=True, exist_ok=True) + args.out.write_text(json.dumps(result, indent=2), encoding="utf-8") + print(f"wrote benchmark metrics to {args.out}") + if __name__ == "__main__": main()