Skip to content

Commit 52f6497

Browse files
authored
feat(features): implement promotion compute method (#109) (#112)
1 parent 902d82a commit 52f6497

3 files changed

Lines changed: 544 additions & 0 deletions

File tree

app/features/featuresets/service.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,26 @@ def compute_features(
138138
result, cols = self._compute_lifecycle_features(result)
139139
feature_columns.extend(cols)
140140

141+
# 7. Promotion features (PRP-3.1D — Phase 2)
142+
if self.config.promotion_config:
143+
promotion_rows_df = getattr(self, "_promotion_rows_df", None)
144+
if promotion_rows_df is None:
145+
# PRP-3.1E wires the DB JOIN that sets this attribute.
146+
# In unit tests, the test sets it directly on the service.
147+
# An empty DataFrame is the safe no-op fallback.
148+
promotion_rows_df = pd.DataFrame(
149+
columns=[
150+
"product_id",
151+
"store_id",
152+
"kind",
153+
"discount_pct",
154+
"start_date",
155+
"end_date",
156+
]
157+
)
158+
result, cols = self._compute_promotion_features(result, promotion_rows_df)
159+
feature_columns.extend(cols)
160+
141161
# Compute stats
142162
null_counts: dict[str, int] = {}
143163
if feature_columns:
@@ -491,6 +511,116 @@ def _compute_lifecycle_features(self, df: pd.DataFrame) -> tuple[pd.DataFrame, l
491511

492512
return result, columns
493513

514+
def _compute_promotion_features(
515+
self,
516+
df: pd.DataFrame,
517+
promotion_rows_df: pd.DataFrame,
518+
) -> tuple[pd.DataFrame, list[str]]:
519+
"""Compute promotion-family features (active + intensity per kind).
520+
521+
CRITICAL: Time-safe via ``groupby(entity_cols).shift(lag_days)`` on
522+
a daily-grain indicator. Per the time-safety contract, the active
523+
indicator at row D reads activity from D - lag_days. A promotion
524+
active on D itself must NOT appear in active_lag{N} at D.
525+
526+
Date-range semantics: ``start_date <= D <= end_date`` (both inclusive).
527+
528+
Chain-wide promotions: rows with ``store_id`` NaN/None apply to
529+
EVERY store of that product. Handled via a two-pass match (store-
530+
specific OR chain-wide), never via a NaN-key merge.
531+
532+
Overlapping promotions on the same kind/day reduce via ``max`` over
533+
``discount_pct`` for intensity (Decision §15-C); active stays 0/1.
534+
535+
Args:
536+
df: Sales DataFrame, pre-sorted and cutoff-filtered (per
537+
compute_features pipeline).
538+
promotion_rows_df: Promotion rows. Columns:
539+
``[product_id, store_id, kind, discount_pct, start_date, end_date]``.
540+
``store_id`` may be NaN (chain-wide). ``discount_pct`` may
541+
be NaN (bogo / bundle kinds).
542+
543+
Returns:
544+
Tuple of (DataFrame with new columns, list of new column names).
545+
"""
546+
config = self.config.promotion_config
547+
if config is None:
548+
raise RuntimeError("_compute_promotion_features called without promotion_config")
549+
550+
result = df.copy()
551+
columns: list[str] = []
552+
lag = config.lag_days
553+
554+
# Defensive re-sort to match the caller invariant.
555+
result = result.sort_values([*self.entity_cols, self.date_col])
556+
dates = pd.to_datetime(result[self.date_col]).dt.date
557+
558+
# Deterministic column ordering: sorted kinds, active before intensity.
559+
sorted_kinds: tuple[str, ...] = tuple(sorted(config.kinds_to_track))
560+
561+
for kind in sorted_kinds:
562+
kind_rows = promotion_rows_df[promotion_rows_df["kind"] == kind]
563+
564+
# Per-row daily indicators (D-day truth, BEFORE lag shift).
565+
active_today: pd.Series[Any] = pd.Series(0, index=result.index, dtype="int64")
566+
intensity_today: pd.Series[Any] = pd.Series(np.nan, index=result.index, dtype="float64")
567+
568+
# Two-pass match: store-specific then chain-wide. Never merge on NaN keys.
569+
store_specific = kind_rows[kind_rows["store_id"].notna()]
570+
chain_wide = kind_rows[kind_rows["store_id"].isna()]
571+
572+
for _, promo in store_specific.iterrows():
573+
mask = (
574+
(result["store_id"] == promo["store_id"])
575+
& (result["product_id"] == promo["product_id"])
576+
& (dates >= promo["start_date"])
577+
& (dates <= promo["end_date"])
578+
)
579+
active_today = active_today.where(~mask, 1)
580+
disc = promo["discount_pct"]
581+
if pd.notna(disc):
582+
# Overlapping-on-same-kind reduction = max (Decision §15-C).
583+
masked_disc = intensity_today.where(~mask, float(disc))
584+
intensity_today = pd.concat([intensity_today, masked_disc], axis=1).max(axis=1)
585+
586+
for _, promo in chain_wide.iterrows():
587+
mask = (
588+
(result["product_id"] == promo["product_id"])
589+
& (dates >= promo["start_date"])
590+
& (dates <= promo["end_date"])
591+
)
592+
active_today = active_today.where(~mask, 1)
593+
disc = promo["discount_pct"]
594+
if pd.notna(disc):
595+
masked_disc = intensity_today.where(~mask, float(disc))
596+
intensity_today = pd.concat([intensity_today, masked_disc], axis=1).max(axis=1)
597+
598+
# CRITICAL: groupby(entity_cols).shift(lag) — the leakage gate.
599+
# Feature at row D reads daily indicator at D - lag.
600+
if config.include_active:
601+
col = f"promo_{kind}_active_lag{lag}"
602+
shifted_active = (
603+
result.assign(_a=active_today)
604+
.groupby(self.entity_cols, observed=True)["_a"]
605+
.shift(lag)
606+
)
607+
# Nullable Int64 preserves NaN at the start of each series
608+
# (mirrors the lag-feature idiom — Decision §15-D).
609+
result[col] = shifted_active.astype("Int64")
610+
columns.append(col)
611+
612+
if config.include_intensity:
613+
col = f"promo_{kind}_intensity_lag{lag}"
614+
shifted_intensity = (
615+
result.assign(_i=intensity_today)
616+
.groupby(self.entity_cols, observed=True)["_i"]
617+
.shift(lag)
618+
)
619+
result[col] = shifted_intensity.astype("float64")
620+
columns.append(col)
621+
622+
return result, columns
623+
494624

495625
class FeatureDataLoader:
496626
"""Async data loader for feature computation.

app/features/featuresets/tests/test_leakage.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
FeatureSetConfig,
1414
LagConfig,
1515
LifecycleConfig,
16+
PromotionConfig,
1617
RollingConfig,
1718
)
1819
from app.features.featuresets.service import FeatureEngineeringService
@@ -418,3 +419,155 @@ def test_lifecycle_group_isolation_no_cross_product_leakage(
418419
f"days_since_launch_lag1={actual}, expected={base_lag}. "
419420
"Lifecycle lag is mixing across products."
420421
)
422+
423+
424+
class TestPromotionLeakage:
425+
"""Tests verifying promotion features never use future data.
426+
427+
PRP-3.1D — these leakage cases are LOAD-BEARING. They assert that a
428+
promotion active on day D MUST NOT appear in day D's
429+
``promo_<kind>_active_lag1`` column; it appears at day D+1 only. The
430+
date-range semantics (start_date <= D <= end_date, both inclusive)
431+
plus ``groupby(...).shift(lag_days)`` are the mathematical leakage gate.
432+
"""
433+
434+
def test_promotion_active_no_leakage_at_same_day(
435+
self,
436+
sample_time_series: pd.DataFrame,
437+
phase2_promotion_rows_df: pd.DataFrame,
438+
) -> None:
439+
"""CRITICAL: A promotion active on day D MUST NOT appear in lag1 at D."""
440+
config = FeatureSetConfig(
441+
name="test",
442+
entity_columns=("store_id", "product_id"),
443+
promotion_config=PromotionConfig(
444+
kinds_to_track=("markdown",),
445+
include_active=True,
446+
include_intensity=False,
447+
lag_days=1,
448+
),
449+
)
450+
service = FeatureEngineeringService(config)
451+
service._promotion_rows_df = phase2_promotion_rows_df # type: ignore[attr-defined]
452+
result = service.compute_features(sample_time_series)
453+
454+
# The fixture's markdown is active 2024-01-07 .. 2024-01-14 (8 days).
455+
# promo_markdown_active_lag1 should be 1 on 2024-01-08 .. 2024-01-15.
456+
df = result.df.reset_index(drop=True)
457+
dates = pd.to_datetime(df["date"]).dt.date
458+
459+
# Day BEFORE start (D=Jan 6): lag1 reads Jan 5 — inactive. EXPECT 0.
460+
assert df.loc[dates == date(2024, 1, 6), "promo_markdown_active_lag1"].iloc[0] == 0
461+
462+
# Day OF start (D=Jan 7): lag1 reads Jan 6 — inactive. EXPECT 0.
463+
# This is the load-bearing leakage check: same-day MUST NOT leak.
464+
assert df.loc[dates == date(2024, 1, 7), "promo_markdown_active_lag1"].iloc[0] == 0, (
465+
"LEAKAGE DETECTED: promo active on day D appeared in active_lag1 at day D"
466+
)
467+
468+
# Day AFTER start (D=Jan 8): lag1 reads Jan 7 — active. EXPECT 1.
469+
assert df.loc[dates == date(2024, 1, 8), "promo_markdown_active_lag1"].iloc[0] == 1
470+
471+
# Day AFTER end (D=Jan 15): lag1 reads Jan 14 — last active day. EXPECT 1.
472+
assert df.loc[dates == date(2024, 1, 15), "promo_markdown_active_lag1"].iloc[0] == 1
473+
474+
# Two days AFTER end (D=Jan 16): lag1 reads Jan 15 — inactive. EXPECT 0.
475+
assert df.loc[dates == date(2024, 1, 16), "promo_markdown_active_lag1"].iloc[0] == 0
476+
477+
def test_promotion_boundary_end_date_at_cutoff(
478+
self,
479+
sample_time_series: pd.DataFrame,
480+
) -> None:
481+
"""A promo ending exactly on cutoff_date - 1 yields active_lag1=1 at cutoff."""
482+
cutoff = date(2024, 1, 15)
483+
promo_rows = pd.DataFrame(
484+
{
485+
"product_id": [1],
486+
"store_id": [1],
487+
"kind": ["markdown"],
488+
"discount_pct": [0.20],
489+
"start_date": [date(2024, 1, 10)],
490+
"end_date": [date(2024, 1, 14)], # cutoff - 1
491+
}
492+
)
493+
config = FeatureSetConfig(
494+
name="test",
495+
entity_columns=("store_id", "product_id"),
496+
promotion_config=PromotionConfig(kinds_to_track=("markdown",), lag_days=1),
497+
)
498+
service = FeatureEngineeringService(config)
499+
service._promotion_rows_df = promo_rows # type: ignore[attr-defined]
500+
result = service.compute_features(sample_time_series, cutoff_date=cutoff)
501+
502+
df = result.df.reset_index(drop=True)
503+
dates = pd.to_datetime(df["date"]).dt.date
504+
# At cutoff (Jan 15), lag1 reads Jan 14 — end_date, INCLUSIVE → active.
505+
last = df.loc[dates == cutoff].iloc[0]
506+
assert last["promo_markdown_active_lag1"] == 1, (
507+
"Boundary leakage: end_date INCLUSIVE on the previous day failed"
508+
)
509+
510+
def test_promotion_starts_on_cutoff_not_in_lag1(
511+
self,
512+
sample_time_series: pd.DataFrame,
513+
) -> None:
514+
"""A promo starting exactly on cutoff is NOT in active_lag1 at cutoff."""
515+
cutoff = date(2024, 1, 15)
516+
promo_rows = pd.DataFrame(
517+
{
518+
"product_id": [1],
519+
"store_id": [1],
520+
"kind": ["markdown"],
521+
"discount_pct": [0.20],
522+
"start_date": [cutoff], # starts today
523+
"end_date": [date(2024, 1, 25)],
524+
}
525+
)
526+
config = FeatureSetConfig(
527+
name="test",
528+
entity_columns=("store_id", "product_id"),
529+
promotion_config=PromotionConfig(kinds_to_track=("markdown",), lag_days=1),
530+
)
531+
service = FeatureEngineeringService(config)
532+
service._promotion_rows_df = promo_rows # type: ignore[attr-defined]
533+
result = service.compute_features(sample_time_series, cutoff_date=cutoff)
534+
535+
df = result.df.reset_index(drop=True)
536+
dates = pd.to_datetime(df["date"]).dt.date
537+
last = df.loc[dates == cutoff].iloc[0]
538+
# lag1 reads cutoff - 1 = Jan 14, BEFORE start_date.
539+
assert last["promo_markdown_active_lag1"] == 0, (
540+
"Same-day leakage: promo starting on D appeared in active_lag1 at D"
541+
)
542+
543+
def test_chain_wide_promo_does_not_bleed_across_products(
544+
self,
545+
multi_series_time_series: pd.DataFrame,
546+
) -> None:
547+
"""A chain-wide promo on product=1 must NOT activate features for product=2."""
548+
promo_rows = pd.DataFrame(
549+
{
550+
"product_id": [1],
551+
"store_id": [None], # chain-wide
552+
"kind": ["markdown"],
553+
"discount_pct": [0.30],
554+
"start_date": [date(2024, 1, 3)],
555+
"end_date": [date(2024, 1, 7)],
556+
}
557+
)
558+
config = FeatureSetConfig(
559+
name="test",
560+
entity_columns=("store_id", "product_id"),
561+
promotion_config=PromotionConfig(kinds_to_track=("markdown",), lag_days=1),
562+
)
563+
service = FeatureEngineeringService(config)
564+
service._promotion_rows_df = promo_rows # type: ignore[attr-defined]
565+
result = service.compute_features(multi_series_time_series)
566+
567+
df = result.df
568+
# Product 1 should see activity 2024-01-04 .. 2024-01-08 (lag1) -- 5 days x 2 stores.
569+
prod1 = df[df["product_id"] == 1]
570+
assert int(prod1["promo_markdown_active_lag1"].sum()) == 5 * 2
571+
# Product 2 should see ZERO activity (chain-wide is product-scoped).
572+
prod2 = df[df["product_id"] == 2]
573+
assert int(prod2["promo_markdown_active_lag1"].sum()) == 0

0 commit comments

Comments
 (0)