From e68a8cd90f73ddc5084e8244b8400e0f6ce76406 Mon Sep 17 00:00:00 2001 From: Joshua Date: Tue, 12 May 2026 10:51:59 -0400 Subject: [PATCH] feat(surrogate): GP/RF interpolation from results table (#82) Add a new `trade_study.surrogate` module that fits scikit-learn Gaussian-process or random-forest models to a ResultsTable and lets users predict any observable at unseen factor combinations. - `fit_surrogate(results, factors, *, method, seed, n_estimators)` returns a `SurrogateModel` carrying one estimator per observable. - Continuous factors are min-max scaled to [0,1]; categorical/discrete factors are one-hot encoded against `factor.levels`. - `SurrogateModel.predict` / `predict_batch` work for both backends; `uncertainty` returns the GP posterior std and raises NotImplementedError for RF. - NaN rows are dropped per-observable. Packaging: - New optional extra `surrogate = ["scikit-learn>=1.3"]`, also added to the `all` aggregate. Docs: - New `docs/api/surrogate.md` page wired into mkdocs nav. Tests: - 13 new tests covering fit/predict round-trip for GP and RF, batch shape, GP uncertainty, RF uncertainty raising, mixed-factor encoding, unknown level / missing factor errors, NaN row handling, and input validation. surrogate.py coverage: 100%. Closes #82. Unblocks #105 (regime-conditional surrogate). --- docs/api/surrogate.md | 14 ++ mkdocs.yml | 1 + pyproject.toml | 5 +- src/trade_study/__init__.py | 3 + src/trade_study/surrogate.py | 307 +++++++++++++++++++++++++++++++++++ tests/test_surrogate.py | 229 ++++++++++++++++++++++++++ 6 files changed, 558 insertions(+), 1 deletion(-) create mode 100644 docs/api/surrogate.md create mode 100644 src/trade_study/surrogate.py create mode 100644 tests/test_surrogate.py diff --git a/docs/api/surrogate.md b/docs/api/surrogate.md new file mode 100644 index 0000000..25fa751 --- /dev/null +++ b/docs/api/surrogate.md @@ -0,0 +1,14 @@ +# Surrogate + +Cheap regression surrogates fit over a [`ResultsTable`][trade_study.ResultsTable] +for predicting observables at untested configurations. + +Install via the optional extra: + +```bash +uv pip install 'trade-study[surrogate]' +``` + +::: trade_study.fit_surrogate + +::: trade_study.SurrogateModel diff --git a/mkdocs.yml b/mkdocs.yml index 7a98342..e1ffbd6 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -74,5 +74,6 @@ nav: - Scoring: api/scoring.md - Pareto: api/pareto.md - Stacking: api/stacking.md + - Surrogate: api/surrogate.md - Visualization: api/viz.md - I/O: api/io.md diff --git a/pyproject.toml b/pyproject.toml index d8b5642..f0bc94d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,8 +73,11 @@ parallel = [ viz = [ "matplotlib>=3.6", ] +surrogate = [ + "scikit-learn>=1.3", +] all = [ - "trade-study[scoring,pareto,stacking,design,adaptive,parallel,viz]", + "trade-study[scoring,pareto,stacking,design,adaptive,parallel,viz,surrogate]", ] examples = [ "scikit-learn>=1.3", diff --git a/src/trade_study/__init__.py b/src/trade_study/__init__.py index b6aa241..0e1e514 100644 --- a/src/trade_study/__init__.py +++ b/src/trade_study/__init__.py @@ -35,6 +35,7 @@ top_k_pareto_filter, weighted_sum_filter, ) +from .surrogate import SurrogateModel, fit_surrogate from .viz import plot_calibration, plot_front, plot_parallel, plot_scores __all__ = [ @@ -51,6 +52,7 @@ "Scorer", "Simulator", "Study", + "SurrogateModel", "TrialResult", "__version__", "build_grid", @@ -58,6 +60,7 @@ "ensemble_predict", "extract_front", "feasibility_filter", + "fit_surrogate", "hypervolume", "igd_plus", "load_results", diff --git a/src/trade_study/surrogate.py b/src/trade_study/surrogate.py new file mode 100644 index 0000000..a4b2593 --- /dev/null +++ b/src/trade_study/surrogate.py @@ -0,0 +1,307 @@ +"""Surrogate models over the results table (#82). + +Fit a cheap regression model to a :class:`ResultsTable` so observables +can be predicted at untested configurations. Two backends: + +- ``"gp"``: scikit-learn :class:`GaussianProcessRegressor` per observable + with a Matern(1.5) + WhiteKernel; provides predictive standard + deviations via :meth:`SurrogateModel.uncertainty`. +- ``"rf"``: scikit-learn :class:`RandomForestRegressor` per observable; + fast, handles non-stationary surfaces, but does not expose calibrated + uncertainties through :meth:`SurrogateModel.uncertainty`. + +Categorical and discrete factors are one-hot encoded; continuous factors +are min-max scaled to ``[0, 1]`` using their declared bounds. The fitted +model is independent per observable column. + +Optional dependency: install via the ``trade-study[surrogate]`` extra. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any + +import numpy as np + +from .design import Factor, FactorType + +if TYPE_CHECKING: + from collections.abc import Sequence + + from numpy.typing import NDArray + + from .protocols import ResultsTable + + +_SUPPORTED_METHODS: frozenset[str] = frozenset({"gp", "rf"}) + + +@dataclass(frozen=True) +class _FactorEncoder: + """Encodes a list of factors into a numeric design matrix. + + Continuous factors are min-max scaled to ``[0, 1]``. Categorical and + discrete factors are one-hot encoded with a stable level ordering + taken from each factor's ``levels`` attribute. + + Attributes: + factors: Ordered list of factors used for encoding. + column_names: Flat list of encoded column names (for debugging / + feature-importance inspection). + """ + + factors: list[Factor] + column_names: list[str] = field(default_factory=list) + + @classmethod + def from_factors(cls, factors: list[Factor]) -> _FactorEncoder: + """Build an encoder with column names derived from ``factors``. + + Args: + factors: Factor list with bounds (continuous) or levels + (categorical/discrete) populated. + + Returns: + A new :class:`_FactorEncoder`. + """ + cols: list[str] = [] + for f in factors: + if f.factor_type == FactorType.CONTINUOUS: + cols.append(f.name) + else: + assert f.levels is not None # noqa: S101 -- enforced by Factor + cols.extend(f"{f.name}={lvl!r}" for lvl in f.levels) + return cls(factors=factors, column_names=cols) + + def transform( + self, + configs: Sequence[dict[str, Any]], + ) -> NDArray[np.float64]: + """Encode configs as a 2-D numeric design matrix. + + Args: + configs: Sequence of factor-keyed config dicts. + + Returns: + Array of shape ``(len(configs), len(column_names))``. + + Raises: + KeyError: If a config is missing a required factor. + ValueError: If a categorical/discrete value is not one of the + factor's declared levels. + """ + rows: list[list[float]] = [] + for cfg in configs: + row: list[float] = [] + for f in self.factors: + if f.name not in cfg: + msg = f"config is missing factor {f.name!r}" + raise KeyError(msg) + if f.factor_type == FactorType.CONTINUOUS: + assert f.bounds is not None # noqa: S101 -- enforced + lo, hi = f.bounds + row.append((float(cfg[f.name]) - lo) / (hi - lo)) + else: + assert f.levels is not None # noqa: S101 -- enforced + value = cfg[f.name] + if value not in f.levels: + msg = ( + f"config value {value!r} for factor {f.name!r} " + f"is not in declared levels {f.levels}" + ) + raise ValueError(msg) + row.extend(1.0 if value == lvl else 0.0 for lvl in f.levels) + rows.append(row) + return np.asarray(rows, dtype=np.float64) + + +@dataclass +class SurrogateModel: + """Fitted surrogate over a :class:`ResultsTable`. + + Use :func:`fit_surrogate` to construct one. Per-observable backend + estimators are stored in ``models``; encoding is shared across them. + + Attributes: + method: ``"gp"`` or ``"rf"``. + encoder: Factor encoder used at fit time. + observable_names: Column names of the predicted observables. + models: One fitted scikit-learn estimator per observable. + """ + + method: str + encoder: _FactorEncoder + observable_names: list[str] + models: list[Any] + + def predict(self, config: dict[str, Any]) -> dict[str, float]: + """Predict observables for a single config. + + Args: + config: Factor-keyed config dict. + + Returns: + Mapping from observable name to predicted scalar. + """ + x = self.encoder.transform([config]) + return { + name: float(model.predict(x)[0]) + for name, model in zip(self.observable_names, self.models, strict=True) + } + + def predict_batch( + self, + configs: Sequence[dict[str, Any]], + ) -> dict[str, NDArray[np.float64]]: + """Predict observables for a batch of configs. + + Args: + configs: Sequence of factor-keyed config dicts. + + Returns: + Mapping from observable name to a length-``len(configs)`` + array of predictions. + """ + x = self.encoder.transform(configs) + return { + name: np.asarray(model.predict(x), dtype=np.float64) + for name, model in zip(self.observable_names, self.models, strict=True) + } + + def uncertainty(self, config: dict[str, Any]) -> dict[str, float]: + """Predictive standard deviation per observable (GP only). + + Args: + config: Factor-keyed config dict. + + Returns: + Mapping from observable name to predictive standard deviation. + + Raises: + NotImplementedError: If the backend does not expose calibrated + uncertainties (currently anything other than ``"gp"``). + """ + if self.method != "gp": + msg = ( + f"uncertainty() is only supported for method='gp'; " + f"this surrogate uses method={self.method!r}" + ) + raise NotImplementedError(msg) + x = self.encoder.transform([config]) + out: dict[str, float] = {} + for name, model in zip(self.observable_names, self.models, strict=True): + _, std = model.predict(x, return_std=True) + out[name] = float(std[0]) + return out + + +def fit_surrogate( + results: ResultsTable, + factors: list[Factor], + *, + method: str = "gp", + seed: int = 0, + n_estimators: int = 200, +) -> SurrogateModel: + """Fit a per-observable surrogate over a :class:`ResultsTable`. + + Rows whose score column contains ``NaN`` are dropped on a + per-observable basis (so a partially-evaluated trial still + contributes to the observables it does have). + + Args: + results: A :class:`ResultsTable` from a previous study run. + factors: Factor definitions used to encode ``results.configs``. + Must cover every key in the configs. + method: ``"gp"`` for Gaussian process (Matern 1.5 + WhiteKernel) + or ``"rf"`` for a random forest. + seed: Random seed forwarded to the backend estimators. + n_estimators: Number of trees for the ``"rf"`` backend; ignored + for ``"gp"``. + + Returns: + A fitted :class:`SurrogateModel`. + + Raises: + ValueError: If ``method`` is unknown, ``results`` is empty, or + no observable has at least two non-NaN training rows. + """ + if method not in _SUPPORTED_METHODS: + msg = ( + f"Unknown surrogate method {method!r}. " + f"Supported: {sorted(_SUPPORTED_METHODS)}" + ) + raise ValueError(msg) + if not results.configs: + msg = "fit_surrogate: results table is empty" + raise ValueError(msg) + + encoder = _FactorEncoder.from_factors(factors) + x_full = encoder.transform(results.configs) + + models: list[Any] = [] + fitted_obs: list[str] = [] + for j, name in enumerate(results.observable_names): + y = results.scores[:, j] + mask = ~np.isnan(y) + if int(mask.sum()) < 2: + continue + model = _make_estimator(method, seed=seed, n_estimators=n_estimators) + model.fit(x_full[mask], y[mask]) + models.append(model) + fitted_obs.append(name) + + if not models: + msg = ( + "fit_surrogate: no observable has at least 2 non-NaN training " + "rows; nothing to fit" + ) + raise ValueError(msg) + + return SurrogateModel( + method=method, + encoder=encoder, + observable_names=fitted_obs, + models=models, + ) + + +def _make_estimator(method: str, *, seed: int, n_estimators: int) -> Any: # noqa: ANN401 + """Construct an unfitted scikit-learn estimator for the requested method. + + Args: + method: ``"gp"`` or ``"rf"`` (validated by the caller). + seed: Random seed. + n_estimators: Number of trees (RF only). + + Returns: + An unfitted scikit-learn regressor. + """ + if method == "gp": + from sklearn.gaussian_process import ( # type: ignore[import-untyped] + GaussianProcessRegressor, + ) + from sklearn.gaussian_process.kernels import ( # type: ignore[import-untyped] + ConstantKernel, + Matern, + WhiteKernel, + ) + + kernel = ConstantKernel(1.0) * Matern(length_scale=1.0, nu=1.5) + WhiteKernel( + noise_level=1e-3, + ) + return GaussianProcessRegressor( + kernel=kernel, + normalize_y=True, + random_state=seed, + n_restarts_optimizer=2, + ) + from sklearn.ensemble import ( # type: ignore[import-untyped] + RandomForestRegressor, + ) + + return RandomForestRegressor( + n_estimators=n_estimators, + random_state=seed, + ) diff --git a/tests/test_surrogate.py b/tests/test_surrogate.py new file mode 100644 index 0000000..d160723 --- /dev/null +++ b/tests/test_surrogate.py @@ -0,0 +1,229 @@ +"""Tests for surrogate module (#82).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import numpy as np +import pytest + +from trade_study import ( + Direction, + Factor, + FactorType, + Observable, + SurrogateModel, + build_grid, + fit_surrogate, + run_grid, +) + +if TYPE_CHECKING: + from trade_study.protocols import ResultsTable + + +class _LinearWorld: + """Trivial simulator: passes config through.""" + + def generate( + self, config: dict[str, float] + ) -> tuple[dict[str, float], dict[str, float]]: + return config, config + + +class _LinearScorer: + """Scorer where ``y = 2*alpha + 0.5*beta`` and ``z = alpha**2``.""" + + def score( + self, + truth: object, + observations: dict[str, float], + config: dict[str, float], + ) -> dict[str, float]: + del truth, config + a = float(observations["alpha"]) + b = float(observations["beta"]) + return {"y": 2.0 * a + 0.5 * b, "z": a * a} + + +@pytest.fixture +def continuous_factors() -> list[Factor]: + return [ + Factor("alpha", FactorType.CONTINUOUS, bounds=(0.0, 1.0)), + Factor("beta", FactorType.CONTINUOUS, bounds=(0.0, 1.0)), + ] + + +@pytest.fixture +def mixed_factors() -> list[Factor]: + return [ + Factor("alpha", FactorType.CONTINUOUS, bounds=(0.0, 1.0)), + Factor("kind", FactorType.CATEGORICAL, levels=["red", "green", "blue"]), + ] + + +def _make_results(factors: list[Factor], n: int = 32, seed: int = 0) -> ResultsTable: + grid = build_grid(factors, method="sobol", n_samples=n, seed=seed) + obs = [ + Observable("y", Direction.MINIMIZE), + Observable("z", Direction.MINIMIZE), + ] + return run_grid(_LinearWorld(), _LinearScorer(), grid, obs) + + +# --------------------------------------------------------------------------- +# Method dispatch and validation +# --------------------------------------------------------------------------- + + +def test_fit_surrogate_unknown_method(continuous_factors: list[Factor]) -> None: + results = _make_results(continuous_factors, n=8) + with pytest.raises(ValueError, match="Unknown surrogate method"): + fit_surrogate(results, continuous_factors, method="bogus") + + +def test_fit_surrogate_empty_results(continuous_factors: list[Factor]) -> None: + results = _make_results(continuous_factors, n=2) + results.configs = [] + results.scores = np.empty((0, 2)) + with pytest.raises(ValueError, match="empty"): + fit_surrogate(results, continuous_factors) + + +def test_fit_surrogate_all_nan(continuous_factors: list[Factor]) -> None: + results = _make_results(continuous_factors, n=8) + results.scores[:] = np.nan + with pytest.raises(ValueError, match="non-NaN"): + fit_surrogate(results, continuous_factors, method="rf") + + +# --------------------------------------------------------------------------- +# GP backend +# --------------------------------------------------------------------------- + + +def test_gp_predict_recovers_linear(continuous_factors: list[Factor]) -> None: + results = _make_results(continuous_factors, n=64, seed=0) + model = fit_surrogate(results, continuous_factors, method="gp", seed=0) + pred = model.predict({"alpha": 0.5, "beta": 0.5}) + # Truth: y = 2*0.5 + 0.5*0.5 = 1.25, z = 0.25 + assert pred["y"] == pytest.approx(1.25, abs=0.05) + assert pred["z"] == pytest.approx(0.25, abs=0.05) + + +def test_gp_uncertainty_returns_floats(continuous_factors: list[Factor]) -> None: + results = _make_results(continuous_factors, n=32) + model = fit_surrogate(results, continuous_factors, method="gp", seed=0) + unc = model.uncertainty({"alpha": 0.5, "beta": 0.5}) + assert set(unc) == {"y", "z"} + assert all(isinstance(v, float) and v >= 0.0 for v in unc.values()) + + +def test_gp_predict_batch_shape(continuous_factors: list[Factor]) -> None: + results = _make_results(continuous_factors, n=16) + model = fit_surrogate(results, continuous_factors, method="gp", seed=0) + batch = [{"alpha": a, "beta": 0.5} for a in [0.0, 0.25, 0.5, 0.75, 1.0]] + pred = model.predict_batch(batch) + assert pred["y"].shape == (5,) + assert pred["z"].shape == (5,) + + +# --------------------------------------------------------------------------- +# RF backend +# --------------------------------------------------------------------------- + + +def test_rf_predict_runs(continuous_factors: list[Factor]) -> None: + results = _make_results(continuous_factors, n=32) + model = fit_surrogate( + results, + continuous_factors, + method="rf", + seed=0, + n_estimators=50, + ) + assert isinstance(model, SurrogateModel) + pred = model.predict({"alpha": 0.5, "beta": 0.5}) + assert set(pred) == {"y", "z"} + + +def test_rf_uncertainty_raises(continuous_factors: list[Factor]) -> None: + results = _make_results(continuous_factors, n=16) + model = fit_surrogate(results, continuous_factors, method="rf", seed=0) + with pytest.raises(NotImplementedError, match="method='gp'"): + model.uncertainty({"alpha": 0.5, "beta": 0.5}) + + +# --------------------------------------------------------------------------- +# Mixed (categorical) factor encoding +# --------------------------------------------------------------------------- + + +def test_mixed_factor_encoding(mixed_factors: list[Factor]) -> None: + grid = build_grid(mixed_factors, method="sobol", n_samples=24, seed=0) + obs = [Observable("y", Direction.MINIMIZE)] + + class _MixedScorer: + def score( + self, + truth: object, + observations: dict[str, object], + config: dict[str, object], + ) -> dict[str, float]: + del truth, config + offset = {"red": 0.0, "green": 1.0, "blue": -1.0}[str(observations["kind"])] + return {"y": float(observations["alpha"]) + offset} + + results = run_grid(_LinearWorld(), _MixedScorer(), grid, obs) + model = fit_surrogate(results, mixed_factors, method="rf", seed=0) + pred_red = model.predict({"alpha": 0.5, "kind": "red"})["y"] + pred_green = model.predict({"alpha": 0.5, "kind": "green"})["y"] + pred_blue = model.predict({"alpha": 0.5, "kind": "blue"})["y"] + # Order should be preserved by the encoding even with a coarse RF. + assert pred_blue < pred_red < pred_green + + +class _AlphaScorer: + """Scorer that only depends on ``alpha``.""" + + def score( + self, + truth: object, + observations: dict[str, object], + config: dict[str, object], + ) -> dict[str, float]: + del truth, config + return {"y": float(observations["alpha"])} + + +def test_predict_rejects_unknown_level(mixed_factors: list[Factor]) -> None: + grid = build_grid(mixed_factors, method="sobol", n_samples=8, seed=0) + obs = [Observable("y", Direction.MINIMIZE)] + results = run_grid(_LinearWorld(), _AlphaScorer(), grid, obs) + model = fit_surrogate(results, mixed_factors, method="rf", seed=0) + with pytest.raises(ValueError, match="not in declared levels"): + model.predict({"alpha": 0.5, "kind": "purple"}) + + +def test_predict_rejects_missing_factor(mixed_factors: list[Factor]) -> None: + grid = build_grid(mixed_factors, method="sobol", n_samples=8, seed=0) + obs = [Observable("y", Direction.MINIMIZE)] + results = run_grid(_LinearWorld(), _AlphaScorer(), grid, obs) + model = fit_surrogate(results, mixed_factors, method="rf", seed=0) + with pytest.raises(KeyError, match="missing factor"): + model.predict({"alpha": 0.5}) + + +# --------------------------------------------------------------------------- +# NaN handling +# --------------------------------------------------------------------------- + + +def test_partial_nan_drops_rows(continuous_factors: list[Factor]) -> None: + results = _make_results(continuous_factors, n=32) + # Knock out half the rows for observable "y" only. + results.scores[::2, 0] = np.nan + model = fit_surrogate(results, continuous_factors, method="rf", seed=0) + pred = model.predict({"alpha": 0.5, "beta": 0.5}) + # Both observables should still be predictable since z has no NaNs. + assert set(pred) == {"y", "z"}