Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ FORECAST_DEFAULT_HORIZON=14
FORECAST_MAX_HORIZON=90
FORECAST_MODEL_ARTIFACTS_DIR=./artifacts/models
FORECAST_ENABLE_LIGHTGBM=false
# FORECAST_ENABLE_XGBOOST defaults to false (opt-in; install ml-xgboost extra)
# FORECAST_ENABLE_RANDOM_FOREST=false # PRP-36 optional model — pure sklearn, no extra needed

# RAG Configuration
# Embedding Provider: "openai" or "ollama"
Expand Down
1 change: 1 addition & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class Settings(BaseSettings):
forecast_model_artifacts_dir: str = "./artifacts/models"
forecast_enable_lightgbm: bool = False
forecast_enable_xgboost: bool = False
forecast_enable_random_forest: bool = False

# Backtesting
backtest_max_splits: int = 20
Expand Down
146 changes: 146 additions & 0 deletions app/features/backtesting/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,45 @@ def wape(
name="wape", value=wape_value, n_samples=len(actuals), warnings=warnings
)

@staticmethod
def rmse(
actuals: np.ndarray[Any, np.dtype[np.floating[Any]]],
predictions: np.ndarray[Any, np.dtype[np.floating[Any]]],
) -> MetricResult:
"""Root Mean Squared Error.

Formula: ``sqrt(mean((A - F) ** 2))``

Penalises large errors more than MAE — useful when a forecast that
misses a single point badly is operationally worse than one that
misses many points by a little.

Args:
actuals: Ground truth values.
predictions: Predicted values.

Returns:
MetricResult with RMSE value (NaN for empty arrays).

Raises:
ValueError: If arrays have different lengths.
"""
warnings: list[str] = []

if len(actuals) == 0:
return MetricResult(name="rmse", value=np.nan, n_samples=0, warnings=["Empty array"])

if len(actuals) != len(predictions):
raise ValueError(
f"Length mismatch: actuals={len(actuals)}, predictions={len(predictions)}"
)

rmse_value = float(np.sqrt(np.mean((actuals - predictions) ** 2)))

return MetricResult(
name="rmse", value=rmse_value, n_samples=len(actuals), warnings=warnings
)

@staticmethod
def bias(
actuals: np.ndarray[Any, np.dtype[np.floating[Any]]],
Expand Down Expand Up @@ -307,6 +346,7 @@ def calculate_all(
"""
return {
"mae": self.mae(actuals, predictions).value,
"rmse": self.rmse(actuals, predictions).value,
"smape": self.smape(actuals, predictions).value,
"wape": self.wape(actuals, predictions).value,
"bias": self.bias(actuals, predictions).value,
Expand Down Expand Up @@ -342,3 +382,109 @@ def aggregate_fold_metrics(
stability[f"{name}_stability"] = np.nan

return aggregated, stability

def aggregate_bucket_metrics(
self,
fold_bucket_metrics: list[dict[str, dict[str, float]]],
) -> dict[str, dict[str, float]]:
"""Aggregate per-horizon-bucket metrics across folds (PRP-36).

For each bucket id present in any fold, compute the per-metric mean
across the folds that emitted that bucket. Folds that did NOT emit
a bucket (because no test point fell inside its horizon range — e.g.
``h_29_plus`` on a 14-day forecast) are silently skipped: their
absence reduces the sample count, not the aggregated value.

Args:
fold_bucket_metrics: List of per-fold bucket dicts (the structure
returned by :func:`compute_bucket_metrics`).

Returns:
Per-bucket aggregated mean dict; empty when every fold reported
an empty bucket dict (degenerate "horizon shorter than the
shortest bucket" case — shouldn't happen given bucket starts
at 1).
"""
if not fold_bucket_metrics:
return {}

# Collect every (bucket_id, metric) pair that appeared in any fold.
bucket_metric_values: dict[str, dict[str, list[float]]] = {}
for fold in fold_bucket_metrics:
for bucket_id, metric_dict in fold.items():
bucket = bucket_metric_values.setdefault(bucket_id, {})
for metric_name, metric_value in metric_dict.items():
if not np.isnan(metric_value):
bucket.setdefault(metric_name, []).append(metric_value)

# Compute mean across folds per (bucket, metric).
aggregated: dict[str, dict[str, float]] = {}
for bucket_id, metrics in bucket_metric_values.items():
bucket_means: dict[str, float] = {}
for metric_name, values in metrics.items():
if values:
bucket_means[metric_name] = float(np.mean(values))
if bucket_means:
aggregated[bucket_id] = bucket_means
return aggregated


HORIZON_BUCKETS: tuple[tuple[str, int, int | None], ...] = (
("h_1_7", 1, 7),
("h_8_14", 8, 14),
("h_15_28", 15, 28),
("h_29_plus", 29, None),
)
"""Per-horizon-bucket boundaries (1-based, inclusive ends; ``None`` = unbounded).

Bucket ids are stable JSON-key-safe strings — keep them in sync with
``app/features/backtesting/schemas.py`` and the Slice C frontend reader.
"""


def compute_bucket_metrics(
actuals: np.ndarray[Any, np.dtype[np.floating[Any]]],
predictions: np.ndarray[Any, np.dtype[np.floating[Any]]],
horizon_offsets: list[int],
) -> dict[str, dict[str, float]]:
"""Compute per-horizon-bucket metrics for a single fold (PRP-36).

Slices the (actuals, predictions) pair by ``horizon_offsets`` lying in
each bucket's ``[start, end]`` range, then calls
:meth:`MetricsCalculator.calculate_all` on the slice. Empty buckets are
dropped from the output (a 14-day horizon's ``h_29_plus`` bucket simply
does not appear) — Slice C never has to interpret a NaN slot.

Args:
actuals: Ground-truth array, length ``H``.
predictions: Predicted array, length ``H``.
horizon_offsets: Per-row horizon position, 1-based. Length ``H``.

Returns:
``dict[bucket_id, dict[metric_name, value]]`` keyed by the bucket
ids from :data:`HORIZON_BUCKETS`. Empty buckets are omitted.

Raises:
ValueError: If the three arrays have different lengths.
"""
if not (len(actuals) == len(predictions) == len(horizon_offsets)):
raise ValueError(
f"array length mismatch: actuals={len(actuals)}, "
f"predictions={len(predictions)}, horizon_offsets={len(horizon_offsets)}"
)
if len(actuals) == 0:
return {}

calc = MetricsCalculator()
out: dict[str, dict[str, float]] = {}
h = np.asarray(horizon_offsets, dtype=np.int64)
max_h = int(h.max())
for bucket_id, start, end in HORIZON_BUCKETS:
upper = end if end is not None else max_h
mask = (h >= start) & (h <= upper)
if not mask.any():
continue
bucket_actuals = actuals[mask]
bucket_predictions = predictions[mask]
out[bucket_id] = calc.calculate_all(bucket_actuals, bucket_predictions)
return out
24 changes: 24 additions & 0 deletions app/features/backtesting/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ class FoldResult(BaseModel):
actuals: Actual values for the test period.
predictions: Predicted values for the test period.
metrics: Dictionary of metric names to values.
horizon_bucket_metrics: PRP-36 — per-horizon-bucket metric block.
Keys are stable bucket ids from
:data:`app.features.backtesting.metrics.HORIZON_BUCKETS`
(``"h_1_7"``, ``"h_8_14"``, ``"h_15_28"``, ``"h_29_plus"``).
Empty buckets are dropped, so a 14-day horizon's payload omits
``h_29_plus`` rather than emitting NaN.
"""

fold_index: int
Expand All @@ -162,6 +168,13 @@ class FoldResult(BaseModel):
actuals: list[float]
predictions: list[float]
metrics: dict[str, float]
horizon_bucket_metrics: dict[str, dict[str, float]] = Field(
default_factory=dict,
description=(
"PRP-36 — per-horizon-bucket metrics keyed by bucket id "
"('h_1_7', 'h_8_14', 'h_15_28', 'h_29_plus'). Empty buckets are dropped."
),
)


class ModelBacktestResult(BaseModel):
Expand All @@ -173,6 +186,10 @@ class ModelBacktestResult(BaseModel):
fold_results: Results for each fold.
aggregated_metrics: Mean metrics across folds.
metric_std: Standard deviation of metrics across folds.
bucketed_aggregated_metrics: PRP-36 — per-horizon-bucket aggregated
means across folds. ``None`` when no fold emitted a non-empty
bucket dict; otherwise keyed by the same bucket ids as
:attr:`FoldResult.horizon_bucket_metrics`.
feature_aware: True when the model consumed a per-fold feature matrix
(``requires_features``); False for target-only baseline models.
exogenous_policy: How the test-window exogenous columns were sourced.
Expand All @@ -186,6 +203,13 @@ class ModelBacktestResult(BaseModel):
fold_results: list[FoldResult]
aggregated_metrics: dict[str, float]
metric_std: dict[str, float]
bucketed_aggregated_metrics: dict[str, dict[str, float]] | None = Field(
default=None,
description=(
"PRP-36 — per-horizon-bucket aggregated metrics across folds. "
"None when no fold emitted bucket metrics."
),
)
feature_aware: bool = False
exogenous_policy: Literal["observed"] | None = None

Expand Down
25 changes: 24 additions & 1 deletion app/features/backtesting/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.config import get_settings
from app.features.backtesting.metrics import MetricsCalculator
from app.features.backtesting.metrics import MetricsCalculator, compute_bucket_metrics
from app.features.backtesting.schemas import (
BacktestConfig,
BacktestResponse,
Expand Down Expand Up @@ -377,6 +377,7 @@ def _run_model_backtest(
"""
fold_results: list[FoldResult] = []
fold_metrics: list[dict[str, float]] = []
fold_bucket_metrics: list[dict[str, dict[str, float]]] = []

# Probe the capability flag, then build the historical matrix once for
# the whole run (feature-aware path only) — sliced, never rebuilt, for
Expand Down Expand Up @@ -415,6 +416,17 @@ def _run_model_backtest(
)
fold_metrics.append(metrics)

# PRP-36 — per-horizon-bucket metrics. ``test_dates[0]`` anchors
# horizon day 1 so ``(d - test_dates[0]).days + 1`` lands in
# bucket ``h_1_7`` for the first 7 days and walks outward.
horizon_offsets = [(d - split.test_dates[0]).days + 1 for d in split.test_dates]
bucket_metrics = compute_bucket_metrics(
actuals=y_test,
predictions=predictions,
horizon_offsets=horizon_offsets,
)
fold_bucket_metrics.append(bucket_metrics)

# Create fold result
split_boundary = SplitBoundary(
fold_index=split.fold_index,
Expand All @@ -434,6 +446,7 @@ def _run_model_backtest(
actuals=[float(v) for v in y_test],
predictions=[float(v) for v in predictions],
metrics=metrics,
horizon_bucket_metrics=bucket_metrics,
)
else:
# Store minimal fold result without detailed arrays
Expand All @@ -444,21 +457,31 @@ def _run_model_backtest(
actuals=[],
predictions=[],
metrics=metrics,
horizon_bucket_metrics=bucket_metrics,
)

fold_results.append(fold_result)

logger.debug(
"backtest.fold_complete",
fold_index=split.fold_index,
bucket_count=len(bucket_metrics),
model_type=model_config.model_type,
)

# Aggregate metrics
aggregated_metrics, metric_std = self.metrics_calculator.aggregate_fold_metrics(
fold_metrics
)
bucketed_aggregated = self.metrics_calculator.aggregate_bucket_metrics(fold_bucket_metrics)

return ModelBacktestResult(
model_type=model_config.model_type,
config_hash=model_config.config_hash(),
fold_results=fold_results,
aggregated_metrics=aggregated_metrics,
metric_std=metric_std,
bucketed_aggregated_metrics=bucketed_aggregated if bucketed_aggregated else None,
feature_aware=feature_aware,
exogenous_policy="observed" if feature_aware else None,
)
Expand Down
Loading