diff --git a/docs/api/design.md b/docs/api/design.md index 3fb75a3..a3614eb 100644 --- a/docs/api/design.md +++ b/docs/api/design.md @@ -6,6 +6,8 @@ Experimental design: factors, grids, and screening. ::: trade_study.Factor +::: trade_study.FactorConstraint + ::: trade_study.build_grid ::: trade_study.reduce_factors diff --git a/src/trade_study/__init__.py b/src/trade_study/__init__.py index 8d167d0..f061df3 100644 --- a/src/trade_study/__init__.py +++ b/src/trade_study/__init__.py @@ -6,7 +6,14 @@ from ._pareto import extract_front, hypervolume, igd_plus, pareto_rank from ._scoring import coverage_curve, score from ._version import __version__ -from .design import Factor, FactorType, build_grid, reduce_factors, screen +from .design import ( + Factor, + FactorConstraint, + FactorType, + build_grid, + reduce_factors, + screen, +) from .io import load_results, save_results from .protocols import ( Annotation, @@ -34,6 +41,7 @@ "Constraint", "Direction", "Factor", + "FactorConstraint", "FactorType", "Observable", "Phase", diff --git a/src/trade_study/design.py b/src/trade_study/design.py index f407f65..d7ed354 100644 --- a/src/trade_study/design.py +++ b/src/trade_study/design.py @@ -5,7 +5,7 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from enum import Enum from itertools import product from typing import TYPE_CHECKING, Any @@ -13,10 +13,17 @@ import numpy as np if TYPE_CHECKING: - from collections.abc import Callable + from collections.abc import Callable, Sequence from numpy.typing import NDArray +# Maximum oversample factor when rejection-sampling a constrained QMC/LHS +# design before giving up. +_MAX_OVERSAMPLE: int = 64 +# Warn when the realized feasibility ratio falls below this fraction; below +# this, QMC space-filling guarantees degrade noticeably. +_LOW_FEASIBILITY_WARN: float = 0.1 + class FactorType(Enum): """Type of design factor.""" @@ -73,6 +80,58 @@ def __post_init__(self) -> None: raise ValueError(msg) +@dataclass(frozen=True) +class FactorConstraint: + """Coupled-factor constraint applied at design-generation time. + + Unlike :class:`trade_study.protocols.Constraint`, which filters trials + after simulation based on observables, a ``FactorConstraint`` filters + candidate configurations *before* they are evaluated. It is the way + to express coupled constraints such as "if ``method == 'a'`` then + ``patience <= 2``" without distorting the design via post-hoc + filtering of returned results. + + Attributes: + predicate: Callable taking a candidate config dict and returning + ``True`` when the config is feasible. + name: Human-readable label used in error messages and warnings. + """ + + predicate: Callable[[dict[str, Any]], bool] + name: str = field(default="factor_constraint") + + def is_feasible(self, config: dict[str, Any]) -> bool: + """Return whether ``config`` satisfies this constraint. + + Args: + config: Candidate design point. + + Returns: + ``True`` if the predicate accepts the config. + """ + return bool(self.predicate(config)) + + +def _all_feasible( + config: dict[str, Any], + constraints: Sequence[FactorConstraint] | None, +) -> bool: + """Return whether ``config`` satisfies every constraint. + + Args: + config: Candidate design point. + constraints: Constraints to check; ``None`` is treated as no + constraints. + + Returns: + ``True`` if all constraints accept ``config`` (or no constraints + were supplied). + """ + if not constraints: + return True + return all(c.is_feasible(config) for c in constraints) + + def build_grid( factors: list[Factor], *, @@ -80,6 +139,7 @@ def build_grid( n_samples: int = 100, seed: int = 42, scramble: bool = True, + constraints: Sequence[FactorConstraint] | None = None, ) -> list[dict[str, Any]]: """Build an experimental design grid. @@ -95,17 +155,31 @@ def build_grid( seed: Random seed. scramble: Whether to apply scrambling to QMC sequences (Sobol / Halton). Ignored for other methods. + constraints: Optional sequence of :class:`FactorConstraint` + applied at design-generation time. For ``"full"``, the + Cartesian product is filtered. For ``"lhs"``/``"sobol"``/ + ``"halton"``, rejection sampling with oversampling is used + to return ``n_samples`` feasible configs (rejection breaks + the QMC space-filling guarantee; expect higher discrepancy + when the feasibility ratio is low). Returns: List of config dictionaries, one per design point. Raises: - ValueError: If an unknown design method is specified. + ValueError: If an unknown design method is specified, or if + constraints reject more than the maximum oversample budget + allows. """ if method == "full": - return _full_factorial(factors) + return _full_factorial(factors, constraints=constraints) if method == "lhs": - return _latin_hypercube(factors, n_samples=n_samples, seed=seed) + return _latin_hypercube( + factors, + n_samples=n_samples, + seed=seed, + constraints=constraints, + ) if method in {"sobol", "halton"}: return _qmc_sample( factors, @@ -113,16 +187,27 @@ def build_grid( seed=seed, qmc_method=method, scramble=scramble, + constraints=constraints, ) msg = f"Unknown design method: {method!r}" raise ValueError(msg) -def _full_factorial(factors: list[Factor]) -> list[dict[str, Any]]: - """Full factorial over all factor levels. +def _full_factorial( + factors: list[Factor], + *, + constraints: Sequence[FactorConstraint] | None = None, +) -> list[dict[str, Any]]: + """Full factorial over all factor levels, optionally constrained. + + Args: + factors: Categorical or discrete factors. + constraints: Optional design-time constraints to filter the + Cartesian product. Returns: - List of config dictionaries, one per design point. + List of config dictionaries, one per design point. Empty if + every combination is rejected by the constraints. Raises: ValueError: If a factor has bounds instead of levels. @@ -135,7 +220,32 @@ def _full_factorial(factors: list[Factor]) -> list[dict[str, Any]]: msg = f"Full factorial requires levels, not bounds, for factor '{f.name}'" raise ValueError(msg) names = [f.name for f in factors] - return [dict(zip(names, combo, strict=True)) for combo in product(*level_lists)] + configs = (dict(zip(names, combo, strict=True)) for combo in product(*level_lists)) + return [c for c in configs if _all_feasible(c, constraints)] + + +def _row_to_config( + row: NDArray[np.floating[Any]], factors: list[Factor] +) -> dict[str, Any]: + """Map a unit-cube QMC/LHS row into a factor-keyed config dict. + + Args: + row: Array in [0, 1) per dimension, one entry per factor. + factors: Factor list defining each dimension. + + Returns: + Config dictionary keyed by factor name. + """ + cfg: dict[str, Any] = {} + for j, f in enumerate(factors): + if f.factor_type == FactorType.CONTINUOUS and f.bounds is not None: + lo, hi = f.bounds + cfg[f.name] = lo + float(row[j]) * (hi - lo) + elif f.levels is not None: + idx = int(row[j] * len(f.levels)) + idx = min(idx, len(f.levels) - 1) + cfg[f.name] = f.levels[idx] + return cfg def _latin_hypercube( @@ -143,30 +253,44 @@ def _latin_hypercube( *, n_samples: int, seed: int, + constraints: Sequence[FactorConstraint] | None = None, ) -> list[dict[str, Any]]: - """Latin hypercube design via pyDOE3. + """Latin hypercube design via pyDOE3, with optional rejection. + + Args: + factors: Design factors. + n_samples: Number of feasible samples to return. + seed: Random seed. + constraints: Optional design-time constraints; rejected samples + are discarded and additional LHS batches are drawn until + ``n_samples`` feasible configs are collected (or the + oversample budget is exhausted). Returns: - List of config dictionaries, one per design point. + List of feasible config dictionaries with length ``n_samples``. + Propagates :class:`ValueError` from the rejection sampler when + the feasibility ratio is too low. """ from pyDOE3 import lhs # type: ignore[import-untyped] n_factors = len(factors) - raw = lhs(n_factors, samples=n_samples, criterion="maximin", seed=seed) - - configs: list[dict[str, Any]] = [] - for row in raw: - cfg: dict[str, Any] = {} - for j, f in enumerate(factors): - if f.factor_type == FactorType.CONTINUOUS and f.bounds is not None: - lo, hi = f.bounds - cfg[f.name] = lo + row[j] * (hi - lo) - elif f.levels is not None: - idx = int(row[j] * len(f.levels)) - idx = min(idx, len(f.levels) - 1) - cfg[f.name] = f.levels[idx] - configs.append(cfg) - return configs + if not constraints: + raw = lhs(n_factors, samples=n_samples, criterion="maximin", seed=seed) + return [_row_to_config(row, factors) for row in raw] + + return _rejection_sample( + n_samples=n_samples, + constraints=constraints, + method_name="lhs", + draw=lambda batch_size, batch_seed: lhs( + n_factors, + samples=batch_size, + criterion="maximin", + seed=batch_seed, + ), + factors=factors, + seed=seed, + ) def _qmc_sample( @@ -176,6 +300,7 @@ def _qmc_sample( seed: int, qmc_method: str, scramble: bool, + constraints: Sequence[FactorConstraint] | None = None, ) -> list[dict[str, Any]]: """Quasi-Monte Carlo design via ``scipy.stats.qmc``. @@ -185,9 +310,16 @@ def _qmc_sample( seed: Random seed for scrambling. qmc_method: ``"sobol"`` or ``"halton"``. scramble: Whether to apply scrambling. + constraints: Optional design-time constraints. When supplied, + the QMC sampler is advanced as a single long sequence and + infeasible points are skipped, preserving the ordering of + the underlying low-discrepancy sequence among accepted + points (rejection still degrades the realized discrepancy). Returns: - List of config dictionaries, one per design point. + List of config dictionaries with length ``n_samples``. + Propagates :class:`ValueError` from the rejection sampler when + the feasibility ratio is too low. """ from scipy.stats import qmc # type: ignore[import-untyped] @@ -197,21 +329,92 @@ def _qmc_sample( sampler = qmc.Sobol(d=n_factors, scramble=scramble, seed=seed) else: sampler = qmc.Halton(d=n_factors, scramble=scramble, seed=seed) - raw = sampler.random(n_samples) - - configs: list[dict[str, Any]] = [] - for row in raw: - cfg: dict[str, Any] = {} - for j, f in enumerate(factors): - if f.factor_type == FactorType.CONTINUOUS and f.bounds is not None: - lo, hi = f.bounds - cfg[f.name] = lo + row[j] * (hi - lo) - elif f.levels is not None: - idx = int(row[j] * len(f.levels)) - idx = min(idx, len(f.levels) - 1) - cfg[f.name] = f.levels[idx] - configs.append(cfg) - return configs + + if not constraints: + raw = sampler.random(n_samples) + return [_row_to_config(row, factors) for row in raw] + + return _rejection_sample( + n_samples=n_samples, + constraints=constraints, + method_name=qmc_method, + draw=lambda batch_size, _seed: sampler.random(batch_size), + factors=factors, + seed=seed, + ) + + +def _rejection_sample( + *, + n_samples: int, + constraints: Sequence[FactorConstraint], + method_name: str, + draw: Callable[[int, int], NDArray[np.floating[Any]]], + factors: list[Factor], + seed: int, +) -> list[dict[str, Any]]: + """Collect ``n_samples`` feasible configs via rejection sampling. + + Doubles the oversample factor on each retry until the budget is + exhausted. Emits a :class:`UserWarning` when the realized + feasibility ratio falls below :data:`_LOW_FEASIBILITY_WARN`. + + Args: + n_samples: Number of feasible configs to return. + constraints: Constraints applied to each candidate. + method_name: Name of the sampler (for error messages). + draw: Callable taking ``(batch_size, seed)`` and returning a + ``(batch_size, n_factors)`` unit-cube array. + factors: Factor list used to map rows to configs. + seed: Seed forwarded to ``draw`` (sequence-based samplers may + ignore it after the first call). + + Returns: + Exactly ``n_samples`` feasible configs. + + Raises: + ValueError: If the constraints reject too many candidates to + reach ``n_samples`` within :data:`_MAX_OVERSAMPLE` x the + requested size. + """ + import warnings + + accepted: list[dict[str, Any]] = [] + drawn = 0 + oversample = 2 + while len(accepted) < n_samples and oversample <= _MAX_OVERSAMPLE: + deficit = n_samples - len(accepted) + batch_size = max(deficit * oversample, 1) + rows = draw(batch_size, seed + drawn) + drawn += batch_size + for row in rows: + cfg = _row_to_config(row, factors) + if _all_feasible(cfg, constraints): + accepted.append(cfg) + if len(accepted) == n_samples: + break + oversample *= 2 + + if len(accepted) < n_samples: + ratio = len(accepted) / max(drawn, 1) + msg = ( + f"{method_name}: constraints rejected too many candidates " + f"({len(accepted)}/{n_samples} feasible after drawing {drawn}; " + f"feasibility ratio {ratio:.3g}). Loosen constraints or " + f"reduce n_samples." + ) + raise ValueError(msg) + + ratio = len(accepted) / drawn + if ratio < _LOW_FEASIBILITY_WARN: + warnings.warn( + f"{method_name}: low feasibility ratio {ratio:.3g} " + f"({len(accepted)}/{drawn}); QMC space-filling properties " + f"are degraded by rejection.", + UserWarning, + stacklevel=3, + ) + return accepted def screen( diff --git a/tests/test_design.py b/tests/test_design.py index dcf2700..304f0f8 100644 --- a/tests/test_design.py +++ b/tests/test_design.py @@ -7,7 +7,14 @@ import numpy as np import pytest -from trade_study.design import Factor, FactorType, build_grid, reduce_factors, screen +from trade_study.design import ( + Factor, + FactorConstraint, + FactorType, + build_grid, + reduce_factors, + screen, +) # --------------------------------------------------------------------------- # Fixtures @@ -436,3 +443,149 @@ def test_reduce_high_threshold_drops_all(continuous_factors: list[Factor]) -> No importance = {"y": np.array([0.1, 0.1])} kept = reduce_factors(continuous_factors, importance, threshold=0.5) assert len(kept) == 0 + + +# --------------------------------------------------------------------------- +# FactorConstraint — coupled design-time constraints (#103) +# --------------------------------------------------------------------------- + + +@pytest.fixture +def coupled_factors() -> list[Factor]: + """Two coupled categorical factors used in constraint tests. + + Returns: + ``method`` in {``elbo_only``, ``mixed``, ``full``} and + ``patience`` in {1, 2, 3, 5, 10}. + """ + return [ + Factor( + "method", + FactorType.CATEGORICAL, + levels=["elbo_only", "mixed", "full"], + ), + Factor("patience", FactorType.DISCRETE, levels=[1, 2, 3, 5, 10]), + ] + + +def _elbo_short_patience(cfg: dict[str, Any]) -> bool: + """Toy coupled rule used across constraint tests. + + Args: + cfg: Candidate config dict. + + Returns: + ``True`` unless ``method == "elbo_only"`` with ``patience > 2``. + """ + return cfg["method"] != "elbo_only" or cfg["patience"] <= 2 + + +def test_factor_constraint_is_feasible() -> None: + c = FactorConstraint(_elbo_short_patience, name="elbo_short_patience") + assert c.is_feasible({"method": "elbo_only", "patience": 1}) + assert not c.is_feasible({"method": "elbo_only", "patience": 5}) + assert c.is_feasible({"method": "mixed", "patience": 10}) + + +def test_full_factorial_filters_constraints(coupled_factors: list[Factor]) -> None: + grid = build_grid( + coupled_factors, + method="full", + constraints=[FactorConstraint(_elbo_short_patience)], + ) + # 3 * 5 = 15 unconstrained; reject elbo_only with patience in {3, 5, 10}. + assert len(grid) == 15 - 3 + for cfg in grid: + assert _elbo_short_patience(cfg) + + +def test_full_factorial_no_constraints_unchanged( + coupled_factors: list[Factor], +) -> None: + base = build_grid(coupled_factors, method="full") + same = build_grid(coupled_factors, method="full", constraints=[]) + assert base == same + + +def test_full_factorial_all_rejected_returns_empty( + coupled_factors: list[Factor], +) -> None: + grid = build_grid( + coupled_factors, + method="full", + constraints=[FactorConstraint(lambda _c: False, name="reject_all")], + ) + assert grid == [] + + +@pytest.mark.parametrize("method", ["sobol", "halton", "lhs"]) +def test_qmc_lhs_returns_n_feasible( + coupled_factors: list[Factor], + method: str, +) -> None: + n = 32 + grid = build_grid( + coupled_factors, + method=method, + n_samples=n, + seed=0, + constraints=[FactorConstraint(_elbo_short_patience)], + ) + assert len(grid) == n + for cfg in grid: + assert _elbo_short_patience(cfg) + + +@pytest.mark.parametrize("method", ["sobol", "halton", "lhs"]) +def test_qmc_lhs_constrained_deterministic( + coupled_factors: list[Factor], + method: str, +) -> None: + constraints = [FactorConstraint(_elbo_short_patience)] + g1 = build_grid( + coupled_factors, + method=method, + n_samples=16, + seed=11, + constraints=constraints, + ) + g2 = build_grid( + coupled_factors, + method=method, + n_samples=16, + seed=11, + constraints=constraints, + ) + assert g1 == g2 + + +def test_qmc_constraints_raise_on_infeasible( + coupled_factors: list[Factor], +) -> None: + with pytest.raises(ValueError, match="rejected too many"): + build_grid( + coupled_factors, + method="sobol", + n_samples=8, + seed=0, + constraints=[FactorConstraint(lambda _c: False, name="reject_all")], + ) + + +def test_qmc_low_feasibility_warns() -> None: + """A very tight constraint should still succeed but warn.""" + factors = [ + Factor("x", FactorType.CONTINUOUS, bounds=(0.0, 1.0)), + ] + # Accept only configs in the bottom 5% of x's range — feasibility ~0.05. + constraints = [FactorConstraint(lambda c: c["x"] < 0.05, name="tight")] + with pytest.warns(UserWarning, match="low feasibility ratio"): + grid = build_grid( + factors, + method="sobol", + n_samples=4, + seed=0, + constraints=constraints, + ) + assert len(grid) == 4 + assert all(cfg["x"] < 0.05 for cfg in grid)