Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions scripts/lib_trend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""
RunTrendAnalyzer — detect score regression across sequential benchmark runs.

Analyzes results JSON files written by benchmark.py to detect whether a model's
performance is improving, stable, or degrading over time via OLS slope fitting.
"""
import json
import logging
import statistics
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional

logger = logging.getLogger("benchmark")


@dataclass
class RunPoint:
"""A single data point from a benchmark run."""
run_id: str
timestamp: float
model: str
score_pct: float
task_count: int


@dataclass
class RunTrendReport:
"""Trend analysis report for a single model."""
model: str
run_count: int
window: int
slope: float
points: List[RunPoint]
regression_detected: bool
regression_threshold: float
task_count_varies: bool = False

def summary(self) -> str:
"""Return a CLI-friendly summary string."""
direction = (
"▼ REGRESSION"
if self.regression_detected
else "▲ improving"
if self.slope > 0
else "→ stable"
)
note = (
" ⚠ task count varied — slope may reflect suite changes"
if self.task_count_varies
else ""
)
return (
f"{direction}: {self.model} slope={self.slope:+.2f}%/run "
f"over last {self.run_count} runs "
f"(threshold={self.regression_threshold:+.2f}){note}"
)


class RunTrendAnalyzer:
"""Detect performance regression across sequential benchmark runs."""

def __init__(
self,
results_dir: Path,
window: int = 10,
regression_threshold: float = -0.5,
):
"""
Args:
results_dir: Directory containing benchmark result JSON files.
window: Number of most recent runs to analyze.
regression_threshold: Slope (pct/run) below which regression is flagged.
"""
self.results_dir = results_dir
self.window = window
self.regression_threshold = regression_threshold

def load_points(self, model: Optional[str] = None) -> Dict[str, List[RunPoint]]:
"""
Load and group RunPoint data from result JSON files, keyed by model slug.
Skips files that fail to parse (JSONDecodeError, OSError).
"""
grouped: Dict[str, List[RunPoint]] = {}
for path in sorted(self.results_dir.glob("*.json")):
try:
data = json.loads(path.read_text())
except (json.JSONDecodeError, OSError):
continue

m = data.get("model", "")
ts = data.get("timestamp", 0.0)
run_id = data.get("run_id", path.stem)
tasks = data.get("tasks", [])
if not tasks:
continue

total = sum(
t["grading"]["mean"]
for t in tasks
if "grading" in t
)
score_pct = (total / len(tasks)) * 100

if model and m != model:
continue

grouped.setdefault(m, []).append(
RunPoint(run_id, ts, m, score_pct, len(tasks))
)

for pts in grouped.values():
pts.sort(key=lambda p: p.timestamp)

return grouped

def analyze(
self, model: Optional[str] = None
) -> List[RunTrendReport]:
"""
Run OLS slope analysis per model over the configured window.
Returns a list of RunTrendReport, sorted by slope ascending.
"""
grouped = self.load_points(model)
reports: List[RunTrendReport] = []

for m, pts in grouped.items():
window_pts = pts[-self.window:]
if len(window_pts) < 2:
continue

xs = list(range(len(window_pts)))
ys = [p.score_pct for p in window_pts]
slope, intercept = statistics.linear_regression(xs, ys)

task_counts = {p.task_count for p in window_pts}
task_count_varies = len(task_counts) > 1

reports.append(
RunTrendReport(
model=m,
run_count=len(window_pts),
window=self.window,
slope=slope,
points=window_pts,
regression_detected=slope < self.regression_threshold,
regression_threshold=self.regression_threshold,
task_count_varies=task_count_varies,
)
)

reports.sort(key=lambda r: r.slope)
return reports

def run(self, model: Optional[str] = None) -> None:
"""CLI entry: analyze and print results."""
reports = self.analyze(model)
if not reports:
logger.info("No trend data available (need ≥2 runs per model).")
return

logger.info("\n" + "=" * 80)
logger.info("📈 RUN TREND ANALYSIS")
logger.info("=" * 80)

for report in reports:
logger.info(" %s", report.summary())

# Show recent scores
for p in report.points:
logger.info(" %s: %.1f%% (%d tasks)", p.run_id, p.score_pct, p.task_count)

logger.info("%s", "=" * 80)
160 changes: 160 additions & 0 deletions tests/test_lib_trend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""Tests for lib_trend — RunTrendAnalyzer."""
import json
import tempfile
import time
from pathlib import Path
from unittest import TestCase

from scripts.lib_trend import RunTrendAnalyzer, RunPoint, RunTrendReport


class TestRunTrendAnalyzer(TestCase):
def _write_run(self, run_dir: Path, run_id: str, model: str, scores: list):
"""Helper: each score = one task's grading.mean in the result JSON."""
tasks = [
{"task_id": f"task_{i}", "grading": {"mean": s}}
for i, s in enumerate(scores)
]
data = {
"model": model,
"run_id": run_id,
"timestamp": time.time(),
"suite": "all",
"tasks": tasks,
}
(run_dir / f"{run_id}_{model.replace('/', '_')}.json").write_text(json.dumps(data))

def test_no_data_returns_empty(self):
with tempfile.TemporaryDirectory() as tmp:
analyzer = RunTrendAnalyzer(Path(tmp))
self.assertEqual(analyzer.analyze(), [])

def test_single_run_returns_empty(self):
"""Need >= 2 runs for trend analysis."""
with tempfile.TemporaryDirectory() as tmp:
run_dir = Path(tmp)
self._write_run(run_dir, "0001", "claude", [0.8])
analyzer = RunTrendAnalyzer(run_dir)
self.assertEqual(analyzer.analyze(), [])

def test_regression_detected(self):
with tempfile.TemporaryDirectory() as tmp:
run_dir = Path(tmp)
# Declining overall scores: 4 runs, single-task each
for i, score in enumerate([0.9, 0.85, 0.80, 0.75]):
self._write_run(run_dir, f"{i:04d}", "claude-sonnet", [score])
analyzer = RunTrendAnalyzer(run_dir, window=10, regression_threshold=-0.5)
reports = analyzer.analyze()
self.assertEqual(len(reports), 1)
self.assertTrue(reports[0].regression_detected)
self.assertLess(reports[0].slope, -0.5)

def test_improving_not_regression(self):
with tempfile.TemporaryDirectory() as tmp:
run_dir = Path(tmp)
for i, score in enumerate([0.75, 0.80, 0.85, 0.90]):
self._write_run(run_dir, f"{i:04d}", "gpt", [score])
analyzer = RunTrendAnalyzer(run_dir)
reports = analyzer.analyze()
self.assertEqual(len(reports), 1)
self.assertFalse(reports[0].regression_detected)
self.assertGreater(reports[0].slope, 0)

def test_malformed_file_skipped(self):
with tempfile.TemporaryDirectory() as tmp:
run_dir = Path(tmp)
(run_dir / "bad.json").write_text("{INVALID JSON!")
self._write_run(run_dir, "0001", "model-a", [0.8])
self._write_run(run_dir, "0002", "model-a", [0.9])
analyzer = RunTrendAnalyzer(run_dir)
reports = analyzer.analyze()
self.assertEqual(len(reports), 1)

def test_task_count_varies_flag(self):
"""Suite expansion across runs should set task_count_varies."""
with tempfile.TemporaryDirectory() as tmp:
run_dir = Path(tmp)
self._write_run(run_dir, "0001", "claude", [0.9]) # 1 task
self._write_run(run_dir, "0002", "claude", [0.85, 0.88, 0.90]) # 3 tasks
analyzer = RunTrendAnalyzer(run_dir)
reports = analyzer.analyze()
self.assertEqual(len(reports), 1)
self.assertTrue(reports[0].task_count_varies)

def test_task_count_varies_false_when_equal(self):
with tempfile.TemporaryDirectory() as tmp:
run_dir = Path(tmp)
self._write_run(run_dir, "0001", "claude", [0.9, 0.8])
self._write_run(run_dir, "0002", "claude", [0.85, 0.88])
analyzer = RunTrendAnalyzer(run_dir)
reports = analyzer.analyze()
self.assertEqual(len(reports), 1)
self.assertFalse(reports[0].task_count_varies)

def test_summary_string_regression(self):
report = RunTrendReport(
model="claude-sonnet",
run_count=5,
window=10,
slope=-1.2,
points=[],
regression_detected=True,
regression_threshold=-0.5,
task_count_varies=False,
)
summary = report.summary()
self.assertIn("REGRESSION", summary)
self.assertIn("-1.20", summary)

def test_summary_string_task_count_warning(self):
report = RunTrendReport(
model="gpt-4",
run_count=4,
window=10,
slope=-0.8,
points=[],
regression_detected=True,
regression_threshold=-0.5,
task_count_varies=True,
)
summary = report.summary()
self.assertIn("task count varied", summary)

def test_stable_scores(self):
with tempfile.TemporaryDirectory() as tmp:
run_dir = Path(tmp)
for i, score in enumerate([0.80, 0.80, 0.80, 0.80]):
self._write_run(run_dir, f"{i:04d}", "stable-model", [score])
analyzer = RunTrendAnalyzer(run_dir)
reports = analyzer.analyze()
self.assertEqual(len(reports), 1)
self.assertEqual(reports[0].slope, 0.0)
self.assertFalse(reports[0].regression_detected)

def test_multiple_models(self):
with tempfile.TemporaryDirectory() as tmp:
run_dir = Path(tmp)
self._write_run(run_dir, "0001", "claude", [0.9])
self._write_run(run_dir, "0002", "claude", [0.8])
self._write_run(run_dir, "0003", "gpt", [0.7])
self._write_run(run_dir, "0004", "gpt", [0.75])
analyzer = RunTrendAnalyzer(run_dir)
reports = analyzer.analyze()
self.assertEqual(len(reports), 2)
# Sorted by slope ascending
self.assertEqual(reports[0].model, "claude")
self.assertLess(reports[0].slope, 0)
self.assertEqual(reports[1].model, "gpt")
self.assertGreater(reports[1].slope, 0)

def test_filter_by_model(self):
with tempfile.TemporaryDirectory() as tmp:
run_dir = Path(tmp)
self._write_run(run_dir, "0001", "claude", [0.9])
self._write_run(run_dir, "0002", "claude", [0.8])
self._write_run(run_dir, "0003", "gpt", [0.7])
self._write_run(run_dir, "0004", "gpt", [0.75])
analyzer = RunTrendAnalyzer(run_dir)
reports = analyzer.analyze(model="claude")
self.assertEqual(len(reports), 1)
self.assertEqual(reports[0].model, "claude")