From 72a954e527ee49a9054289ede78a77ba86c9be1f Mon Sep 17 00:00:00 2001 From: Gabor Szabo Date: Tue, 26 May 2026 16:57:35 +0200 Subject: [PATCH 1/2] fix(data): make phase2 enrichment idempotent (#312) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A second POST /seeder/phase2-enrichment against an already-enriched scope no longer raises IntegrityError (previously: uq_exogenous_signal_per_store surfaced as HTTP 500, blocking PRP-41's manual showcase_rich dogfood). - exogenous_signal: pg_insert(...).on_conflict_do_nothing() — target-free ON CONFLICT covers both partial unique indexes (global + per-store) - replenishment_event / sales_returns (no natural-key unique constraint): section-level existence check inside the seeded date window; skip the whole insert when rows already exist - product lifecycle UPDATE: already idempotent under a fixed seed; left in place - Phase2EnrichmentResponse gains additive records_skipped: dict[str, int] - Defensive IntegrityError -> ConflictError(409) wrap as the belt-and-braces net (the idempotency guards above should make it unreachable) Adds a non-destructive integration test (app/features/seeder/tests/ test_phase2_idempotency.py) that calls the endpoint twice against the live Postgres and asserts records_skipped > 0 + records_created == 0 on the second pass for all three insert tables. --- app/features/seeder/schemas.py | 7 + app/features/seeder/service.py | 294 ++++++++++++------ .../seeder/tests/test_phase2_idempotency.py | 114 +++++++ 3 files changed, 322 insertions(+), 93 deletions(-) create mode 100644 app/features/seeder/tests/test_phase2_idempotency.py diff --git a/app/features/seeder/schemas.py b/app/features/seeder/schemas.py index 56f71872..20a22dc3 100644 --- a/app/features/seeder/schemas.py +++ b/app/features/seeder/schemas.py @@ -475,4 +475,11 @@ class Phase2EnrichmentResponse(BaseModel): "(product, replenishment_event, exogenous_signal, sales_returns)." ), ) + records_skipped: dict[str, int] = Field( + default_factory=dict, + description=( + "Count of rows skipped per table on an idempotent re-run " + "(populated when prior phase-2 data is already present in scope)." + ), + ) duration_ms: float = Field(description="Wall-clock duration in milliseconds.") diff --git a/app/features/seeder/service.py b/app/features/seeder/service.py index 0d166420..87b20709 100644 --- a/app/features/seeder/service.py +++ b/app/features/seeder/service.py @@ -10,10 +10,11 @@ from sqlalchemy import func, select, update from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import get_settings -from app.core.exceptions import UnprocessableEntityError +from app.core.exceptions import ConflictError, UnprocessableEntityError from app.core.logging import get_logger from app.features.data_platform.models import ( Calendar, @@ -897,16 +898,28 @@ async def phase2_enrichment( 4. INSERT ``sales_returns`` rows sampled from the existing positive-quantity ``sales_daily`` rows. + Idempotent (#312): a second call against an already-enriched scope is a + no-op for the inserting steps — ``exogenous_signal`` uses Postgres + ``ON CONFLICT DO NOTHING`` against its two partial unique indexes, while + ``replenishment_event`` and ``sales_returns`` (no natural-key unique + constraint) skip the section when rows already exist within the seeded + date range. The lifecycle ``UPDATE`` block is naturally idempotent under + a fixed seed. + Args: db: Async database session. params: Caller-supplied seed + probabilities. Returns: - Phase2EnrichmentResponse with per-table row counts and wall-clock. + Phase2EnrichmentResponse with per-table ``records_created`` and + ``records_skipped`` counts plus wall-clock. Raises: UnprocessableEntityError: When dimensions or calendar are empty (caller must seed first); when the seeded calendar spans 0 days. + ConflictError: Defensive net — a residual ``IntegrityError`` (should + not fire after the idempotency logic above) is mapped to 409 + RFC 7807 rather than bubbling as a raw 500. """ start_time = time.perf_counter() rng = random.Random(params.seed) @@ -934,112 +947,207 @@ async def phase2_enrichment( seed=params.seed, ) - # ---- 1) Lifecycle: UPDATE per product + skipped: dict[str, int] = { + "product": 0, + "replenishment_event": 0, + "exogenous_signal": 0, + "sales_returns": 0, + } + try: - lifecycle_map = _assign_lifecycle( - rng, - product_ids, - start_date, - end_date, - discontinue_probability=params.discontinue_probability, + # ---- 1) Lifecycle: UPDATE per product (deterministic with seed → idempotent) + try: + lifecycle_map = _assign_lifecycle( + rng, + product_ids, + start_date, + end_date, + discontinue_probability=params.discontinue_probability, + ) + except ValueError as exc: + raise UnprocessableEntityError(message=str(exc)) from exc + for pid, (launch, disc, stage) in lifecycle_map.items(): + await db.execute( + update(Product) + .where(Product.id == pid) + .values(launch_date=launch, discontinue_date=disc, lifecycle_stage=stage) + ) + product_updates = len(lifecycle_map) + await db.commit() + + # ---- 2) Replenishment events (no unique constraint — section-level skip) + lt_cfg = LeadTimeConfig( + enable=True, + mean_lead_time_days=7, + lead_time_sigma_days=1.5, + safety_stock_days=3, + order_frequency_days=14, + fill_rate_mean=0.97, + fill_rate_sigma=0.05, ) - except ValueError as exc: - raise UnprocessableEntityError(message=str(exc)) from exc - for pid, (launch, disc, stage) in lifecycle_map.items(): - await db.execute( - update(Product) - .where(Product.id == pid) - .values(launch_date=launch, discontinue_date=disc, lifecycle_stage=stage) + rep_gen = ReplenishmentGenerator(rng, lt_cfg) + rep_records = rep_gen.generate(store_ids, product_ids, dates, base_demand=100) + existing_rep = ( + await db.scalar( + select(func.count()) + .select_from(ReplenishmentEvent) + .where( + ReplenishmentEvent.date >= start_date, + ReplenishmentEvent.date <= end_date, + ) + ) + or 0 ) - product_updates = len(lifecycle_map) - await db.commit() - - # ---- 2) Replenishment events - lt_cfg = LeadTimeConfig( - enable=True, - mean_lead_time_days=7, - lead_time_sigma_days=1.5, - safety_stock_days=3, - order_frequency_days=14, - fill_rate_mean=0.97, - fill_rate_sigma=0.05, - ) - rep_gen = ReplenishmentGenerator(rng, lt_cfg) - rep_records = rep_gen.generate(store_ids, product_ids, dates, base_demand=100) - for i in range(0, len(rep_records), PHASE2_ENRICHMENT_BATCH_SIZE): - chunk = rep_records[i : i + PHASE2_ENRICHMENT_BATCH_SIZE] - if chunk: - await db.execute(pg_insert(ReplenishmentEvent).values(chunk)) - await db.commit() - - # ---- 3) Exogenous signals (weather + macro) - ex_cfg = ExogenousSignalConfig( - enable_weather=True, - enable_macro=True, - enable_events=False, - weather_climatology_mean_c=15.0, - weather_amplitude_c=12.0, - weather_noise_sigma_c=2.0, - macro_initial_value=100.0, - macro_step_sigma=0.5, - ) - ex_gen = ExogenousSignalGenerator(rng, ex_cfg) - ex_records = ex_gen.generate(dates, store_ids) - for i in range(0, len(ex_records), PHASE2_ENRICHMENT_BATCH_SIZE): - chunk = ex_records[i : i + PHASE2_ENRICHMENT_BATCH_SIZE] - if chunk: - await db.execute(pg_insert(ExogenousSignal).values(chunk)) - await db.commit() - - # ---- 4) Sales returns (sampled from existing positive-quantity sales) - ret_cfg = ReturnsConfig( - enable=True, - return_probability=params.returns_probability, - return_lag_days_min=1, - return_lag_days_max=14, - return_quantity_fraction=0.5, - ) - ret_gen = ReturnsGenerator(rng, ret_cfg) - sales_rows = ( - await db.execute( - select( - SalesDaily.date, - SalesDaily.store_id, - SalesDaily.product_id, - SalesDaily.quantity, - ).where(SalesDaily.quantity > 0) + if existing_rep: + skipped["replenishment_event"] = len(rep_records) + rep_created = 0 + logger.info( + "seeder.phase2_enrichment.skip", + table="replenishment_event", + existing_rows=existing_rep, + skipped=len(rep_records), + ) + else: + for i in range(0, len(rep_records), PHASE2_ENRICHMENT_BATCH_SIZE): + chunk = rep_records[i : i + PHASE2_ENRICHMENT_BATCH_SIZE] + if chunk: + await db.execute(pg_insert(ReplenishmentEvent).values(chunk)) + rep_created = len(rep_records) + await db.commit() + + # ---- 3) Exogenous signals — ON CONFLICT DO NOTHING on the two partial unique + # indexes (uq_exogenous_signal_global, uq_exogenous_signal_per_store). + ex_cfg = ExogenousSignalConfig( + enable_weather=True, + enable_macro=True, + enable_events=False, + weather_climatology_mean_c=15.0, + weather_amplitude_c=12.0, + weather_noise_sigma_c=2.0, + macro_initial_value=100.0, + macro_step_sigma=0.5, ) - ).fetchall() - sales_records: list[dict[str, date | int | Decimal]] = [ - { - "date": r[0], - "store_id": r[1], - "product_id": r[2], - "quantity": int(r[3]), - } - for r in sales_rows - ] - ret_records = ret_gen.generate(sales_records, end_date) - for i in range(0, len(ret_records), PHASE2_ENRICHMENT_BATCH_SIZE): - chunk = ret_records[i : i + PHASE2_ENRICHMENT_BATCH_SIZE] - if chunk: - await db.execute(pg_insert(SalesReturn).values(chunk)) - await db.commit() + ex_gen = ExogenousSignalGenerator(rng, ex_cfg) + ex_records = ex_gen.generate(dates, store_ids) + existing_ex = ( + await db.scalar( + select(func.count()) + .select_from(ExogenousSignal) + .where( + ExogenousSignal.date >= start_date, + ExogenousSignal.date <= end_date, + ) + ) + or 0 + ) + for i in range(0, len(ex_records), PHASE2_ENRICHMENT_BATCH_SIZE): + chunk = ex_records[i : i + PHASE2_ENRICHMENT_BATCH_SIZE] + if chunk: + await db.execute(pg_insert(ExogenousSignal).values(chunk).on_conflict_do_nothing()) + await db.commit() + new_ex = ( + await db.scalar( + select(func.count()) + .select_from(ExogenousSignal) + .where( + ExogenousSignal.date >= start_date, + ExogenousSignal.date <= end_date, + ) + ) + or 0 + ) + ex_created = max(0, new_ex - existing_ex) + skipped["exogenous_signal"] = max(0, len(ex_records) - ex_created) + if skipped["exogenous_signal"]: + logger.info( + "seeder.phase2_enrichment.skip", + table="exogenous_signal", + existing_rows=existing_ex, + skipped=skipped["exogenous_signal"], + ) + + # ---- 4) Sales returns (no unique constraint — section-level skip) + ret_cfg = ReturnsConfig( + enable=True, + return_probability=params.returns_probability, + return_lag_days_min=1, + return_lag_days_max=14, + return_quantity_fraction=0.5, + ) + ret_gen = ReturnsGenerator(rng, ret_cfg) + sales_rows = ( + await db.execute( + select( + SalesDaily.date, + SalesDaily.store_id, + SalesDaily.product_id, + SalesDaily.quantity, + ).where(SalesDaily.quantity > 0) + ) + ).fetchall() + sales_records: list[dict[str, date | int | Decimal]] = [ + { + "date": r[0], + "store_id": r[1], + "product_id": r[2], + "quantity": int(r[3]), + } + for r in sales_rows + ] + ret_records = ret_gen.generate(sales_records, end_date) + existing_ret = ( + await db.scalar( + select(func.count()) + .select_from(SalesReturn) + .where( + SalesReturn.date >= start_date, + SalesReturn.date <= end_date, + ) + ) + or 0 + ) + if existing_ret: + skipped["sales_returns"] = len(ret_records) + ret_created = 0 + logger.info( + "seeder.phase2_enrichment.skip", + table="sales_returns", + existing_rows=existing_ret, + skipped=len(ret_records), + ) + else: + for i in range(0, len(ret_records), PHASE2_ENRICHMENT_BATCH_SIZE): + chunk = ret_records[i : i + PHASE2_ENRICHMENT_BATCH_SIZE] + if chunk: + await db.execute(pg_insert(SalesReturn).values(chunk)) + ret_created = len(ret_records) + await db.commit() + except IntegrityError as exc: + await db.rollback() + raise ConflictError( + message=( + "Phase 2 enrichment hit a residual database constraint conflict " + "(idempotency guards should have caught this — please report)." + ), + details={"error": str(exc.orig) if exc.orig else str(exc)}, + ) from exc duration_ms = (time.perf_counter() - start_time) * 1000.0 counts = { "product": product_updates, - "replenishment_event": len(rep_records), - "exogenous_signal": len(ex_records), - "sales_returns": len(ret_records), + "replenishment_event": rep_created, + "exogenous_signal": ex_created, + "sales_returns": ret_created, } logger.info( "seeder.phase2_enrichment.complete", duration_ms=duration_ms, - **counts, + records_created=counts, + records_skipped=skipped, ) return schemas.Phase2EnrichmentResponse( success=True, records_created=counts, + records_skipped=skipped, duration_ms=duration_ms, ) diff --git a/app/features/seeder/tests/test_phase2_idempotency.py b/app/features/seeder/tests/test_phase2_idempotency.py new file mode 100644 index 00000000..3598dd01 --- /dev/null +++ b/app/features/seeder/tests/test_phase2_idempotency.py @@ -0,0 +1,114 @@ +"""Integration tests for the phase2-enrichment idempotency contract (#312). + +Calls ``POST /seeder/phase2-enrichment`` twice against a real Postgres-backed +database and asserts that the second call: + +* returns 2xx (no ``IntegrityError`` from the ``uq_exogenous_signal_per_store`` + / ``uq_exogenous_signal_global`` partial unique indexes), +* reports ``records_skipped`` populated for the three insert tables + (``replenishment_event`` / ``exogenous_signal`` / ``sales_returns``), +* reports ``records_created`` zero for those same tables on the second call. + +Requires ``docker compose up -d`` AND a seeded data platform (``demo_minimal`` +or richer). The test is **non-destructive** — it reuses whatever calendar + +dimensions are already in the dev DB and skips with a clear message when the +DB is empty. +""" + +from __future__ import annotations + +from collections.abc import AsyncGenerator + +import pytest +from fastapi import status +from httpx import ASGITransport, AsyncClient +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + +from app.core.config import get_settings +from app.core.database import get_db +from app.features.data_platform.models import Calendar, Product, Store +from app.main import app + + +@pytest.fixture +async def db_session() -> AsyncGenerator[AsyncSession, None]: + """Yield an async session against the live dev Postgres. + + Non-destructive — no cleanup. The test reuses whatever data the dev DB + already has (see module docstring). + """ + settings = get_settings() + engine = create_async_engine(settings.database_url, echo=False) + async_session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with async_session_maker() as session: + yield session + await engine.dispose() + + +@pytest.fixture +async def client(db_session: AsyncSession) -> AsyncGenerator[AsyncClient, None]: + async def override_get_db() -> AsyncGenerator[AsyncSession, None]: + yield db_session + + app.dependency_overrides[get_db] = override_get_db + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + yield ac + app.dependency_overrides.pop(get_db, None) + + +@pytest.mark.integration +@pytest.mark.asyncio +class TestPhase2EnrichmentIdempotency: + """Two consecutive ``POST /seeder/phase2-enrichment`` calls do not error.""" + + async def test_second_call_is_idempotent( + self, + client: AsyncClient, + db_session: AsyncSession, + ) -> None: + """Second call returns 2xx and reports ``records_skipped`` populated. + + Regression for #312 — pre-fix the second call raised + ``IntegrityError`` on ``uq_exogenous_signal_per_store`` and the global + handler surfaced it as HTTP 500. + """ + # Skip cleanly when the DB is empty — the test is a regression for an + # already-seeded path, not a seeding test. The seeded path is exercised + # by ``make demo`` + the e2e nightly job. + n_stores = await db_session.scalar(select(func.count()).select_from(Store)) or 0 + n_products = await db_session.scalar(select(func.count()).select_from(Product)) or 0 + n_calendar = await db_session.scalar(select(func.count()).select_from(Calendar)) or 0 + if not (n_stores and n_products and n_calendar): + pytest.skip( + "Phase 2 idempotency test needs a seeded DB — run `make demo` or " + "POST /seeder/generate {scenario: demo_minimal} first." + ) + + payload = {"seed": 42, "returns_probability": 0.05} + + first = await client.post("/seeder/phase2-enrichment", json=payload) + assert first.status_code == status.HTTP_201_CREATED, first.text + first_body = first.json() + assert first_body["success"] is True + # The first call populates phase2 outputs (or skips if a prior run + # already populated them — both are valid pre-states for this test). + + second = await client.post("/seeder/phase2-enrichment", json=payload) + assert second.status_code == status.HTTP_201_CREATED, second.text + body = second.json() + assert body["success"] is True + + skipped = body["records_skipped"] + created = body["records_created"] + + # The three insert tables must skip on the second pass — proof of + # idempotency. ``product`` is an UPDATE step (no skip semantic). + for table in ("replenishment_event", "exogenous_signal", "sales_returns"): + assert skipped[table] > 0, ( + f"expected records_skipped[{table}] > 0 on the second call, " + f"got skipped={skipped}, created={created}" + ) + assert created[table] == 0, ( + f"expected records_created[{table}] == 0 on the second call, got created={created}" + ) From 608d5a906368793261a29f7ea776447d277b80e8 Mon Sep 17 00:00:00 2001 From: Gabor Szabo Date: Tue, 26 May 2026 17:05:52 +0200 Subject: [PATCH 2/2] test(data): relax phase2 idempotency assertion for sparse CI DB (#312) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CI's freshly-migrated DB has only stray data from earlier tests' fixtures — too sparse for the ReplenishmentGenerator or ReturnsGenerator to produce any records. The original assertion required records_skipped > 0 for all three insert tables, which fails when nothing was generated. Relaxed: assert created == 0 on the second call for all three insert tables (the canonical idempotency proof — no new rows). Sanity guard soft-skips when none of the three tables exercised the idempotency path (otherwise the test would be meaningless). The exogenous_signal ON CONFLICT DO NOTHING path — the original IntegrityError surface — is exercised by any DB with at least one Store + one date. --- .../seeder/tests/test_phase2_idempotency.py | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/app/features/seeder/tests/test_phase2_idempotency.py b/app/features/seeder/tests/test_phase2_idempotency.py index 3598dd01..41465b0d 100644 --- a/app/features/seeder/tests/test_phase2_idempotency.py +++ b/app/features/seeder/tests/test_phase2_idempotency.py @@ -91,9 +91,11 @@ async def test_second_call_is_idempotent( assert first.status_code == status.HTTP_201_CREATED, first.text first_body = first.json() assert first_body["success"] is True - # The first call populates phase2 outputs (or skips if a prior run - # already populated them — both are valid pre-states for this test). + first_created = first_body["records_created"] + first_skipped = first_body["records_skipped"] + # The main bug fix: the second call must NOT bubble an IntegrityError + # as HTTP 500 — it must return 2xx. second = await client.post("/seeder/phase2-enrichment", json=payload) assert second.status_code == status.HTTP_201_CREATED, second.text body = second.json() @@ -102,13 +104,32 @@ async def test_second_call_is_idempotent( skipped = body["records_skipped"] created = body["records_created"] - # The three insert tables must skip on the second pass — proof of - # idempotency. ``product`` is an UPDATE step (no skip semantic). + # Idempotency proof: the second call writes ZERO new rows to any of + # the three insert tables, regardless of how many the first call + # produced. (``product`` is a deterministic UPDATE under a fixed seed + # and has no skip semantic — its count just mirrors the row count.) for table in ("replenishment_event", "exogenous_signal", "sales_returns"): - assert skipped[table] > 0, ( - f"expected records_skipped[{table}] > 0 on the second call, " - f"got skipped={skipped}, created={created}" - ) assert created[table] == 0, ( - f"expected records_created[{table}] == 0 on the second call, got created={created}" + f"expected no new {table} rows on the second call (idempotency), " + f"got created={created}, skipped={skipped}" + ) + + # Sanity guard: the test is only meaningful if at least one of the + # three insert tables actually exercised the idempotency path — i.e. + # the first call produced rows for it, OR the second call skipped + # rows for it (the original ``uq_exogenous_signal_per_store`` bug + # surface). Sparse fixture state from other tests can leave the + # generators producing zero records; soft-skip rather than fail in + # that case. + for table in ("replenishment_event", "exogenous_signal", "sales_returns"): + if first_created.get(table, 0) > 0 or skipped.get(table, 0) > 0: + break + if first_skipped.get(table, 0) > 0: + break + else: + pytest.skip( + "DB state too sparse to generate phase-2 records for any of the " + "three insert tables — idempotency path not exercised. " + f"first_created={first_created}, first_skipped={first_skipped}, " + f"second_created={created}, second_skipped={skipped}" )