|
20 | 20 | from sklearn.ensemble import ( # type: ignore[import-untyped] |
21 | 21 | HistGradientBoostingRegressor, |
22 | 22 | ) |
| 23 | +from sklearn.impute import SimpleImputer # type: ignore[import-untyped] |
| 24 | +from sklearn.linear_model import Ridge # type: ignore[import-untyped] |
| 25 | +from sklearn.pipeline import Pipeline # type: ignore[import-untyped] |
23 | 26 |
|
24 | 27 | if TYPE_CHECKING: |
25 | 28 | from app.features.forecasting.schemas import ModelConfig |
26 | 29 |
|
27 | 30 |
|
| 31 | +# Canonical 14-column feature frame partitioned into the three Prophet-style |
| 32 | +# additive components. Together the three column tuples cover all 14 canonical |
| 33 | +# columns exactly — which is what makes the additive invariant hold (the |
| 34 | +# component contributions partition the full coef_ · x sum). See |
| 35 | +# ``canonical_feature_columns()`` in ``app/shared/feature_frames``. |
| 36 | +_PROPHET_LIKE_COMPONENTS: dict[str, tuple[str, ...]] = { |
| 37 | + "trend": ("lag_1", "lag_7", "lag_14", "lag_28", "days_since_launch"), |
| 38 | + "seasonality": ( |
| 39 | + "dow_sin", |
| 40 | + "dow_cos", |
| 41 | + "month_sin", |
| 42 | + "month_cos", |
| 43 | + "is_weekend", |
| 44 | + "is_month_end", |
| 45 | + ), |
| 46 | + "holiday_regressor": ("price_factor", "promo_active", "is_holiday"), |
| 47 | +} |
| 48 | + |
| 49 | + |
| 50 | +@dataclass |
| 51 | +class ForecastDecomposition: |
| 52 | + """Additive component breakdown of a Prophet-like forecast. |
| 53 | +
|
| 54 | + Invariant: ``intercept + trend + seasonality + holiday_regressor`` equals |
| 55 | + ``predict(...)`` for the same ``X`` (within float tolerance), element-wise. |
| 56 | + Each component array has shape ``[n_rows]`` — one value per forecast row. |
| 57 | +
|
| 58 | + Attributes: |
| 59 | + intercept: The fitted Ridge intercept (a scalar, broadcast over rows). |
| 60 | + trend: Per-row contribution of the trend columns (autoregressive lags |
| 61 | + + ``days_since_launch``). |
| 62 | + seasonality: Per-row contribution of the calendar/seasonal columns. |
| 63 | + holiday_regressor: Per-row contribution of the holiday + extra-regressor |
| 64 | + columns (price, promotion, holiday flag). |
| 65 | + """ |
| 66 | + |
| 67 | + intercept: float |
| 68 | + trend: np.ndarray[Any, np.dtype[np.floating[Any]]] |
| 69 | + seasonality: np.ndarray[Any, np.dtype[np.floating[Any]]] |
| 70 | + holiday_regressor: np.ndarray[Any, np.dtype[np.floating[Any]]] |
| 71 | + |
| 72 | + |
28 | 73 | @dataclass |
29 | 74 | class FitResult: |
30 | 75 | """Result of model fitting. |
@@ -888,9 +933,191 @@ def set_params(self, **params: Any) -> XGBoostForecaster: # noqa: ANN401 |
888 | 933 | return self |
889 | 934 |
|
890 | 935 |
|
| 936 | +class ProphetLikeForecaster(BaseForecaster): |
| 937 | + """Feature-aware ADDITIVE forecaster — Ridge over the canonical frame. |
| 938 | +
|
| 939 | + Prophet-LIKE, not Prophet: it approximates Prophet's additive trend + |
| 940 | + seasonality + holiday/regressor decomposition with a regularized linear |
| 941 | + model over the already-engineered 14-column feature frame. It REQUIRES a |
| 942 | + non-``None`` exogenous ``X`` for both ``fit`` and ``predict``. |
| 943 | +
|
| 944 | + The fitted estimator is a scikit-learn ``Pipeline`` of two deterministic |
| 945 | + steps: a ``SimpleImputer(strategy="median")`` that fills the ``NaN`` lag |
| 946 | + cells the future feature frame emits (a bare ``Ridge`` raises |
| 947 | + ``ValueError: Input contains NaN``), followed by a |
| 948 | + ``Ridge(solver="cholesky")`` whose closed-form L2-regularized fit is |
| 949 | + robust to the collinear engineered columns. Folding the imputer INSIDE the |
| 950 | + pipeline keeps the no-leakage invariant: it learns its medians on the |
| 951 | + training ``X`` only and re-applies them at predict time. |
| 952 | +
|
| 953 | + ``decompose()`` returns the per-component additive contributions of a |
| 954 | + forecast — the literal ``y_hat = intercept + trend + seasonality + |
| 955 | + holiday_regressor`` split, computed on the IMPUTED ``X``. |
| 956 | +
|
| 957 | + NOT modelled (deliberately — see PRP-MLZOO-C2 Risks): changepoint trend, |
| 958 | + posterior uncertainty intervals, automatic seasonality discovery, |
| 959 | + multiplicative seasonality. This is an additive linear approximation, not |
| 960 | + the real ``prophet`` package. |
| 961 | +
|
| 962 | + Attributes: |
| 963 | + alpha: Ridge L2 regularization strength (0.0 degenerates to OLS). |
| 964 | + """ |
| 965 | + |
| 966 | + requires_features: ClassVar[bool] = True |
| 967 | + """A feature-aware model — ``fit``/``predict`` REQUIRE a non-None ``X``.""" |
| 968 | + |
| 969 | + def __init__(self, *, alpha: float = 1.0, random_state: int = 42) -> None: |
| 970 | + """Initialize the Prophet-like additive forecaster. |
| 971 | +
|
| 972 | + Args: |
| 973 | + alpha: Ridge L2 regularization strength. The default 1.0 keeps |
| 974 | + coefficients robust to the collinear engineered-feature frame. |
| 975 | + random_state: Kept for interface parity with the other forecasters; |
| 976 | + ``Ridge(solver="cholesky")`` is closed-form and needs no seed. |
| 977 | + """ |
| 978 | + super().__init__(random_state) |
| 979 | + self.alpha = alpha |
| 980 | + self._estimator: Any = None |
| 981 | + |
| 982 | + def fit( |
| 983 | + self, |
| 984 | + y: np.ndarray[Any, np.dtype[np.floating[Any]]], |
| 985 | + X: np.ndarray[Any, np.dtype[np.floating[Any]]] | None = None, |
| 986 | + ) -> ProphetLikeForecaster: |
| 987 | + """Fit the additive Ridge pipeline on historical features. |
| 988 | +
|
| 989 | + Args: |
| 990 | + y: Target values (1D array of shape ``[n_samples]``). |
| 991 | + X: Exogenous features (2D array of shape ``[n_samples, n_features]``). |
| 992 | + REQUIRED — unlike the baseline forecasters. |
| 993 | +
|
| 994 | + Returns: |
| 995 | + self (for method chaining). |
| 996 | +
|
| 997 | + Raises: |
| 998 | + ValueError: If ``X`` is ``None``, ``y`` is empty, or the row counts |
| 999 | + of ``X`` and ``y`` do not match. |
| 1000 | + """ |
| 1001 | + if X is None: |
| 1002 | + raise ValueError("ProphetLikeForecaster requires exogenous features X for fit()") |
| 1003 | + if len(y) == 0: |
| 1004 | + raise ValueError("Cannot fit on empty array") |
| 1005 | + if X.shape[0] != len(y): |
| 1006 | + raise ValueError( |
| 1007 | + f"X has {X.shape[0]} rows but y has {len(y)} — feature/target rows must match" |
| 1008 | + ) |
| 1009 | + # The imputer learns its per-column medians on THIS training X only; |
| 1010 | + # the Ridge solver is deterministic and closed-form. |
| 1011 | + estimator: Any = Pipeline( |
| 1012 | + [ |
| 1013 | + ("impute", SimpleImputer(strategy="median")), |
| 1014 | + ("ridge", Ridge(alpha=self.alpha, solver="cholesky")), |
| 1015 | + ] |
| 1016 | + ) |
| 1017 | + estimator.fit(X, y) |
| 1018 | + self._estimator = estimator |
| 1019 | + self._last_values = np.asarray(y[-1:], dtype=np.float64) |
| 1020 | + self._is_fitted = True |
| 1021 | + return self |
| 1022 | + |
| 1023 | + def predict( |
| 1024 | + self, |
| 1025 | + horizon: int, |
| 1026 | + X: np.ndarray[Any, np.dtype[np.floating[Any]]] | None = None, |
| 1027 | + ) -> np.ndarray[Any, np.dtype[np.floating[Any]]]: |
| 1028 | + """Generate forecasts from a future feature frame. |
| 1029 | +
|
| 1030 | + Args: |
| 1031 | + horizon: Number of steps to forecast. |
| 1032 | + X: Exogenous features for the forecast period, shape |
| 1033 | + ``[horizon, n_features]``. REQUIRED. |
| 1034 | +
|
| 1035 | + Returns: |
| 1036 | + Array of forecasts with shape ``[horizon]``. |
| 1037 | +
|
| 1038 | + Raises: |
| 1039 | + RuntimeError: If the model has not been fitted. |
| 1040 | + ValueError: If ``X`` is ``None`` or its row count is not ``horizon``. |
| 1041 | + """ |
| 1042 | + if not self._is_fitted or self._estimator is None: |
| 1043 | + raise RuntimeError("Model must be fitted before predict") |
| 1044 | + if X is None: |
| 1045 | + raise ValueError("ProphetLikeForecaster requires exogenous features X for predict()") |
| 1046 | + if X.shape[0] != horizon: |
| 1047 | + raise ValueError(f"X has {X.shape[0]} rows but horizon is {horizon} — they must match") |
| 1048 | + # The Pipeline imputes the NaN lag cells, then the Ridge predicts. |
| 1049 | + predictions = self._estimator.predict(X) |
| 1050 | + result: np.ndarray[Any, np.dtype[np.floating[Any]]] = np.asarray( |
| 1051 | + predictions, dtype=np.float64 |
| 1052 | + ) |
| 1053 | + return result |
| 1054 | + |
| 1055 | + def decompose(self, X: np.ndarray[Any, np.dtype[np.floating[Any]]]) -> ForecastDecomposition: |
| 1056 | + """Split a forecast into its additive trend / seasonality / regressor parts. |
| 1057 | +
|
| 1058 | + Operates on the IMPUTED ``X`` — the trained imputer's ``transform`` — |
| 1059 | + so the per-component contributions sum EXACTLY to ``predict(...)``: any |
| 1060 | + ``NaN`` cell is filled with the TRAINING-window median, never a |
| 1061 | + predict-time median (no leakage). Each component contribution is the |
| 1062 | + partial sum ``Σ_{i ∈ component} coef_i · x_i``; together the three |
| 1063 | + component column-sets partition all 14 canonical columns, so |
| 1064 | + ``intercept + trend + seasonality + holiday_regressor == predict()``. |
| 1065 | +
|
| 1066 | + Args: |
| 1067 | + X: Feature matrix of shape ``[n_rows, n_features]`` (the same frame |
| 1068 | + a ``predict`` call would consume). May contain ``NaN`` cells. |
| 1069 | +
|
| 1070 | + Returns: |
| 1071 | + A :class:`ForecastDecomposition` with the four-way breakdown. |
| 1072 | +
|
| 1073 | + Raises: |
| 1074 | + RuntimeError: If the model has not been fitted. |
| 1075 | + """ |
| 1076 | + from app.shared.feature_frames import canonical_feature_columns |
| 1077 | + |
| 1078 | + if not self._is_fitted or self._estimator is None: |
| 1079 | + raise RuntimeError("Model must be fitted before decompose") |
| 1080 | + imputer = self._estimator.named_steps["impute"] |
| 1081 | + ridge = self._estimator.named_steps["ridge"] |
| 1082 | + x_imputed = imputer.transform(X) |
| 1083 | + columns = canonical_feature_columns() |
| 1084 | + coef = np.asarray(ridge.coef_, dtype=np.float64) |
| 1085 | + contributions: dict[str, np.ndarray[Any, np.dtype[np.floating[Any]]]] = {} |
| 1086 | + for component, comp_cols in _PROPHET_LIKE_COMPONENTS.items(): |
| 1087 | + idx = [columns.index(c) for c in comp_cols] |
| 1088 | + contributions[component] = np.asarray(x_imputed[:, idx] @ coef[idx], dtype=np.float64) |
| 1089 | + return ForecastDecomposition( |
| 1090 | + intercept=float(ridge.intercept_), |
| 1091 | + trend=contributions["trend"], |
| 1092 | + seasonality=contributions["seasonality"], |
| 1093 | + holiday_regressor=contributions["holiday_regressor"], |
| 1094 | + ) |
| 1095 | + |
| 1096 | + def get_params(self) -> dict[str, Any]: |
| 1097 | + """Get model parameters. |
| 1098 | +
|
| 1099 | + Returns: |
| 1100 | + Dictionary with alpha and random_state. |
| 1101 | + """ |
| 1102 | + return {"alpha": self.alpha, "random_state": self.random_state} |
| 1103 | + |
| 1104 | + def set_params(self, **params: Any) -> ProphetLikeForecaster: # noqa: ANN401 |
| 1105 | + """Set model parameters. |
| 1106 | +
|
| 1107 | + Args: |
| 1108 | + **params: Parameter names and values to set. |
| 1109 | +
|
| 1110 | + Returns: |
| 1111 | + self (for method chaining). |
| 1112 | + """ |
| 1113 | + for key, value in params.items(): |
| 1114 | + setattr(self, key, value) |
| 1115 | + return self |
| 1116 | + |
| 1117 | + |
891 | 1118 | # Type alias for model type literals |
892 | 1119 | ModelType = Literal[ |
893 | | - "naive", "seasonal_naive", "moving_average", "xgboost", "lightgbm", "regression" |
| 1120 | + "naive", "seasonal_naive", "moving_average", "xgboost", "lightgbm", "regression", "prophet_like" |
894 | 1121 | ] |
895 | 1122 |
|
896 | 1123 |
|
@@ -974,5 +1201,13 @@ def model_factory(config: ModelConfig, random_state: int = 42) -> BaseForecaster |
974 | 1201 | random_state=random_state, |
975 | 1202 | ) |
976 | 1203 | raise ValueError("Invalid config type for regression") |
| 1204 | + elif model_type == "prophet_like": |
| 1205 | + # No flag gate — the Prophet-like model is pure scikit-learn and ships |
| 1206 | + # always-enabled, exactly like ``regression``. |
| 1207 | + from app.features.forecasting.schemas import ProphetLikeModelConfig |
| 1208 | + |
| 1209 | + if isinstance(config, ProphetLikeModelConfig): |
| 1210 | + return ProphetLikeForecaster(alpha=config.alpha, random_state=random_state) |
| 1211 | + raise ValueError("Invalid config type for prophet_like") |
977 | 1212 | else: |
978 | 1213 | raise ValueError(f"Unknown model type: {model_type}") |
0 commit comments