Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
539 changes: 530 additions & 9 deletions app/features/demo/pipeline.py

Large diffs are not rendered by default.

275 changes: 269 additions & 6 deletions app/features/demo/tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,15 @@ def _canned_response(
"sales": 500,
}
if path.startswith("/dimensions/stores"):
# page_size=5 is the PRP-39 batch_preset discovery call; return 3 stores
# so the step doesn't skip. Other callers ask for page_size=1; either
# way the first item is the showcase grain (id=7).
if "page_size=5" in path:
return {"stores": [{"id": 7}, {"id": 8}, {"id": 9}]}
return {"stores": [{"id": 7}]}
if path.startswith("/dimensions/products"):
if "page_size=5" in path:
return {"products": [{"id": 3}, {"id": 4}]}
return {"products": [{"id": 3}]}
if path == "/featuresets/compute":
return {"row_count": 80, "feature_columns": ["lag_1", "roll_7", "dow"]}
Expand Down Expand Up @@ -113,10 +120,70 @@ def _canned_response(
"feature_groups": {"target_history": ["lag_1", "lag_7"], "calendar": ["dow", "month"]},
"feature_safety_classes": {"lag_1": "leak_safe"},
}
if path.startswith("/registry/runs?"):
# PRP-39 — champion_compat_compare lists SUCCESS runs on the grain.
return {
"runs": [
{"run_id": "v1-baseline-run-id-aaaa", "feature_frame_version": None},
{"run_id": "demo-run-abc123def456", "feature_frame_version": 2},
],
}
if path.startswith("/registry/compare/"):
# PRP-39 — champion_compat_compare GETs the compare envelope.
return {
"run_a": {
"run_id": "v1-baseline-run-id-aaaa",
"feature_frame_version": None,
},
"run_b": {
"run_id": "demo-run-abc123def456",
"feature_frame_version": 2,
},
"config_diff": {},
"metrics_diff": {},
}
if path.startswith("/registry/runs/"): # PATCH pending->running->success
return {}
if path == "/registry/aliases":
return {}
if path.startswith("/registry/aliases/"):
# PRP-39 — safer_promote_flow GETs the current alias target before swap.
return {
"alias_name": "demo-production",
"run_id": "demo-run-abc123def456",
"description": "current target",
}
if path == "/ops/summary":
# PRP-39 — stale_alias_trigger GETs after registering a V=3 run.
return {
"aliases": [
{
"alias_name": "demo-production",
"stale_reason": "feature_frame_version_mismatch",
"alias_feature_frame_version": 2,
"comparable_run_feature_frame_version": 3,
}
]
}
if path == "/batch/forecasting":
# PRP-39 — batch_preset POSTs the preset expansion. Return terminal
# COMPLETED status (per D3, settles synchronously in most cases).
return {
"batch_id": "batch-demo-abcdef0123",
"status": "completed",
"total_items": 18,
"completed_items": 18,
"failed_items": 0,
}
if path.startswith("/batch/"):
# Safety-net poll path (rare in canned fast tests).
return {
"batch_id": path.split("/")[-1],
"status": "completed",
"total_items": 18,
"completed_items": 18,
"failed_items": 0,
}
raise AssertionError(f"unexpected request path: {path}")


Expand Down Expand Up @@ -379,7 +446,13 @@ def test_phase_table_demo_minimal_matches_legacy_11_steps():


def test_phase_table_showcase_rich_adds_v2_steps():
"""PRP-38 — phase_table for SHOWCASE_RICH adds 3 steps; phase order stable."""
"""PRP-38/39 — phase_table for SHOWCASE_RICH adds 3+4 steps; phase order stable.

PRP-38 shipped 3 (phase2_enrichment, historical_backfill, v2_train).
PRP-39 adds 4 more (champion_compat_compare, stale_alias_trigger,
safer_promote_flow, batch_preset) AND a new ``portfolio`` phase between
``decision`` and ``verify``. Total: 18 rows across 7 phases.
"""
rows = pipeline._phase_table(ScenarioPreset.SHOWCASE_RICH)
by_phase_step = [(p, s) for p, s, _fn in rows]
assert by_phase_step == [
Expand All @@ -394,6 +467,12 @@ def test_phase_table_showcase_rich_adds_v2_steps():
("modeling", "v2_train"),
("decision", "backtest"),
("decision", "register"),
# PRP-39 — three decision-phase extensions after register.
("decision", "champion_compat_compare"),
("decision", "stale_alias_trigger"),
("decision", "safer_promote_flow"),
# PRP-39 — new portfolio phase between decision and verify.
("portfolio", "batch_preset"),
("verify", "verify"),
("agent", "agent"),
("cleanup", "cleanup"),
Expand Down Expand Up @@ -502,8 +581,13 @@ async def test_run_pipeline_showcase_rich_runs_v2_and_buckets(monkeypatch, tmp_p
assert final.data["v2_run_id"] == "demo-run-abc123def456"


async def test_run_pipeline_showcase_rich_emits_14_steps(monkeypatch, tmp_path):
"""PRP-38 — SHOWCASE_RICH adds 3 new steps (11 -> 14 total)."""
async def test_run_pipeline_showcase_rich_emits_18_steps(monkeypatch, tmp_path):
"""PRP-38/39 — SHOWCASE_RICH adds 3+4 new steps (11 -> 18 total).

PRP-38 shipped 14 (11 + phase2_enrichment + historical_backfill + v2_train).
PRP-39 adds 4 more (champion_compat_compare + stale_alias_trigger +
safer_promote_flow + batch_preset).
"""
artifact = tmp_path / "artifacts" / "models" / "model_x.joblib"
artifact.parent.mkdir(parents=True, exist_ok=True)
artifact.write_bytes(b"x")
Expand All @@ -514,7 +598,186 @@ async def test_run_pipeline_showcase_rich_emits_14_steps(monkeypatch, tmp_path):
req = DemoRunRequest(scenario=ScenarioPreset.SHOWCASE_RICH)
events = [e async for e in pipeline.run_pipeline(app=_FAKE_APP, req=req)]
completes = [e for e in events if e.event_type == "step_complete"]
assert len(completes) == 14
# Every event reports total_steps=14
assert len(completes) == 18
# Every event reports total_steps=18
for ev in completes:
assert ev.total_steps == 14
assert ev.total_steps == 18


# =============================================================================
# PRP-39 — per-step unit tests (canned ASGI HTTP)
# =============================================================================


def _make_ctx_showcase_ready() -> pipeline.DemoContext:
"""Build a DemoContext with the fields PRP-39 steps consume already set."""
from datetime import date

ctx = pipeline.DemoContext(
seed=42,
skip_seed=True,
reset=False,
scenario=ScenarioPreset.SHOWCASE_RICH,
)
ctx.store_id = 7
ctx.product_id = 3
ctx.date_start = date(2024, 10, 1)
ctx.date_end = date(2024, 12, 31)
ctx.winner_model_type = "prophet_like"
ctx.winner_wape = 0.08
ctx.winning_run_id = "demo-run-abc123def456"
ctx.v2_run_id = "demo-run-abc123def456"
return ctx


def _bind_fake_client(artifact_path: str, wapes: dict[str, float]) -> Any:
"""Construct a fake-client instance for direct step-function invocation."""
fake_class = _build_fake_client(artifact_path, wapes)
return fake_class(_FAKE_APP)


async def test_champion_compat_compare_step_marks_v_mismatch_incompatible(monkeypatch, tmp_path):
"""PRP-39 — champion_compat_compare derives compatible=False on V mismatch."""
artifact = tmp_path / "m.joblib"
artifact.write_bytes(b"x")
monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg")))
client = _bind_fake_client(str(artifact), {"prophet_like": 0.08})

ctx = _make_ctx_showcase_ready()
status, detail, data = await pipeline.step_champion_compat_compare(ctx, client)

assert status == "pass"
assert data["compatible"] is False
assert data["comparable_reason"] == "feature_frame_version_mismatch"
assert data["v1_run_id"] == "v1-baseline-run-id-aaaa"
assert data["v2_run_id"] == "demo-run-abc123def456"
assert data["feature_frame_version_a"] is None
assert data["feature_frame_version_b"] == 2
assert "V_a=1" in detail and "V_b=2" in detail


async def test_champion_compat_compare_step_skips_without_v2_run(monkeypatch, tmp_path):
"""PRP-39 — champion_compat_compare skips when no V2 run exists (R14)."""
artifact = tmp_path / "m.joblib"
artifact.write_bytes(b"x")
monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg")))
client = _bind_fake_client(str(artifact), {})

ctx = _make_ctx_showcase_ready()
ctx.v2_run_id = None
status, detail, _ = await pipeline.step_champion_compat_compare(ctx, client)

assert status == "skip"
assert "showcase_rich" in detail


async def test_stale_alias_trigger_step_surfaces_v_mismatch(monkeypatch, tmp_path):
"""PRP-39 — stale_alias_trigger registers V=3 run and confirms ops verdict."""
artifact = tmp_path / "m.joblib"
artifact.write_bytes(b"x")
monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg")))
client = _bind_fake_client(str(artifact), {"prophet_like": 0.08})

ctx = _make_ctx_showcase_ready()
status, _detail, data = await pipeline.step_stale_alias_trigger(ctx, client)

assert status == "pass"
assert data["alias_name"] == "demo-production"
assert data["stale_reason"] == "feature_frame_version_mismatch"
assert data["alias_feature_frame_version"] == 2
assert data["comparable_run_feature_frame_version"] == 3
assert ctx.stale_alias_run_id == "demo-run-abc123def456"


async def test_safer_promote_flow_step_captures_original_alias(monkeypatch, tmp_path):
"""PRP-39 — safer_promote_flow records original alias for R15 restore."""
artifact = tmp_path / "m.joblib"
artifact.write_bytes(b"x")
monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg")))
client = _bind_fake_client(str(artifact), {"seasonal_naive": 99.0})

ctx = _make_ctx_showcase_ready()
status, _detail, data = await pipeline.step_safer_promote_flow(ctx, client)

assert status == "pass"
assert data["alias_name"] == "demo-production"
assert data["before_run_id"] == "demo-run-abc123def456" # canned GET response
assert data["after_run_id"] == "demo-run-abc123def456" # canned POST returns same id
assert data["swap_intent"] == "demo_safer_promote_walkthrough"
# R15 — original alias captured before swap.
assert ctx.original_demo_alias_run_id == "demo-run-abc123def456"


async def test_batch_preset_step_emits_terminal_completed(monkeypatch, tmp_path):
"""PRP-39 — batch_preset returns pass on terminal completed status (D2/D3)."""
artifact = tmp_path / "m.joblib"
artifact.write_bytes(b"x")
monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg")))
client = _bind_fake_client(str(artifact), {})

ctx = _make_ctx_showcase_ready()
status, detail, data = await pipeline.step_batch_preset(ctx, client)

assert status == "pass"
assert data["batch_id"] == "batch-demo-abcdef0123"
assert data["kind"] == "manual"
assert data["preset_source"] == "quick_baseline_sweep"
assert data["total_items"] == 18
assert data["completed_items"] == 18
assert data["status"] == "completed"
assert "preset=quick_baseline_sweep" in detail
assert ctx.batch_id == "batch-demo-abcdef0123"


async def test_batch_preset_step_skips_without_date_range(monkeypatch, tmp_path):
"""PRP-39 — batch_preset skips gracefully when no date range present."""
artifact = tmp_path / "m.joblib"
artifact.write_bytes(b"x")
monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg")))
client = _bind_fake_client(str(artifact), {})

ctx = _make_ctx_showcase_ready()
ctx.date_start = None
ctx.date_end = None
status, detail, _ = await pipeline.step_batch_preset(ctx, client)

assert status == "skip"
assert "showcase_rich" in detail


async def test_cleanup_restores_alias_when_promote_swapped_it(monkeypatch, tmp_path):
"""PRP-39 R15 — cleanup restores demo-production alias post-swap."""
artifact = tmp_path / "m.joblib"
artifact.write_bytes(b"x")
monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg")))
client = _bind_fake_client(str(artifact), {})

ctx = _make_ctx_showcase_ready()
ctx.original_demo_alias_run_id = "original-v2-winner-run-id"
# No agent session opened
ctx.session_id = None

status, detail, data = await pipeline.step_cleanup(ctx, client)

assert status == "pass"
assert data["alias_restored"] is True
assert data["restored_run_id"] == "original-v2-winner-run-id"
assert "alias restored" in detail


async def test_cleanup_skips_when_nothing_to_restore_or_close(monkeypatch, tmp_path):
"""PRP-39 — cleanup is a no-op skip when no agent + no alias swap occurred."""
artifact = tmp_path / "m.joblib"
artifact.write_bytes(b"x")
monkeypatch.setattr(pipeline, "get_settings", lambda: _fake_settings(str(tmp_path / "reg")))
client = _bind_fake_client(str(artifact), {})

ctx = _make_ctx_showcase_ready()
ctx.session_id = None
ctx.original_demo_alias_run_id = None # PRP-39 — no swap to restore

status, _detail, data = await pipeline.step_cleanup(ctx, client)

assert status == "skip"
assert data["alias_restored"] is False
assert data["agent_session_closed"] is False
Loading