feat(data,repo): local demo tooling + seeder price-history fix#298
Conversation
Bundles three carryover concerns from prior local demo work into one PR.
* fix(data) — PriceHistoryGenerator could emit a row with valid_to <
valid_from when a change roll fired on the window's first day. That
violates ck_price_history_valid_dates and crashed the seeder during
ingest. The fix skips the degenerate row.
* feat(data) — three new local-host scripts that drive the public API
to enrich the demo DB without raw SQL writes:
- seed_phase2_only: re-runs Phase 2 generators (replenishment,
exogenous, returns, lifecycle) against existing dimensions
- seed_historical_activity: submits varied train/predict/backtest
jobs across 2024-Q4 -> 2026-Q1 cutoffs through /jobs
- seed_registry_from_jobs: walks completed train jobs, runs the
canonical pending -> running -> success transition + alias stamps
* chore(repo) — uv.lock refreshes forecastlabai 0.2.18 -> 0.2.19
to match the release-please-merged version bump.
Excluded intentionally: alembic/a2b3c4d5e6f7 + rag/models.py — the
migration is self-marked "local-only demo" (truncates document_chunk,
drops HNSW index, hardcodes 2560 for qwen3) and would wipe any non-
qwen3 user's RAG corpus on upgrade. Stays uncommitted locally.
Reviewer's GuideFixes a bug in the price-history seeder that could generate invalid date windows, and adds three localhost-only demo scripts that seed additional data and historical activity via the public API while updating the lockfile to the current forecastlabai version. Sequence diagram for seed_registry_from_jobs registry backfillsequenceDiagram
participant Script as SeedRegistryFromJobs
participant JobsAPI as JobsAPI
participant RegistryAPI as RegistryAPI
participant FS as RegistryArtifactStorage
Script->>JobsAPI: GET /jobs?job_type=train&status=completed (fetch_completed_train_jobs)
JobsAPI-->>Script: completed train jobs
loop for each completed job
Script->>FS: check/copy model_path to registry_root
Script->>RegistryAPI: POST /registry/runs
alt created
RegistryAPI-->>Script: run_id
Script->>RegistryAPI: PATCH /registry/runs/{run_id} status=running
RegistryAPI-->>Script: updated run
Script->>RegistryAPI: PATCH /registry/runs/{run_id}
activate Script
Script-->>RegistryAPI: status=success, metrics, artifact_uri, artifact_hash
deactivate Script
RegistryAPI-->>Script: updated run
else duplicate config_hash
RegistryAPI-->>Script: 4xx (skip run)
end
end
Script->>Script: select winners per (store_id, product_id)
loop aliases for each winner
Script->>RegistryAPI: POST /registry/aliases (champion/challenger)
RegistryAPI-->>Script: alias created or error
end
Sequence diagram for seed_historical_activity job seedingsequenceDiagram
participant Script as SeedHistoricalActivity
participant JobsAPI as JobsAPI
participant BatchAPI as BatchForecastingAPI
loop for each (store_id, product_id, cutoff, model_type)
Script->>JobsAPI: POST /jobs (job_type=train)
JobsAPI-->>Script: job_id
Script->>JobsAPI: GET /jobs/{job_id} (poll_job)
JobsAPI-->>Script: status, run_id
end
Script->>Script: filter completed train jobs at latest cutoff
loop for each latest run_id
Script->>JobsAPI: POST /jobs (job_type=predict)
JobsAPI-->>Script: job_id
Script->>JobsAPI: GET /jobs/{job_id} (poll_job)
JobsAPI-->>Script: status
end
loop for selected pairs
Script->>JobsAPI: POST /jobs (job_type=backtest)
JobsAPI-->>Script: job_id
Script->>JobsAPI: GET /jobs/{job_id} (poll_job, longer timeout)
JobsAPI-->>Script: status
end
Script->>BatchAPI: POST /batch/forecasting
BatchAPI-->>Script: batch_id, item_count or error
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- In
fetch_completed_train_jobs, the pagination break conditionif page * len(jobs) >= totalcan stop too early when the last page is partially filled; consider using the configuredpage_size(or an accumulated count) instead ofpage * len(jobs)to decide when to exit. - In
seed_phase2_only.chunked, the use of the new generic function syntaxdef chunked[U](...)ties the script to Python 3.12+; if this repo targets earlier versions, switch to theTypeVarpattern (def chunked(items: list[U], size: int) -> Iterator[list[U]]) for broader compatibility.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `fetch_completed_train_jobs`, the pagination break condition `if page * len(jobs) >= total` can stop too early when the last page is partially filled; consider using the configured `page_size` (or an accumulated count) instead of `page * len(jobs)` to decide when to exit.
- In `seed_phase2_only.chunked`, the use of the new generic function syntax `def chunked[U](...)` ties the script to Python 3.12+; if this repo targets earlier versions, switch to the `TypeVar` pattern (`def chunked(items: list[U], size: int) -> Iterator[list[U]]`) for broader compatibility.
## Individual Comments
### Comment 1
<location path="scripts/seed_registry_from_jobs.py" line_range="102-111" />
<code_context>
+ model_type = str(params.get("model_type", ""))
+ if model_type not in {"naive", "seasonal_naive", "moving_average"}:
+ return None # only baselines for this backfill
+ source_path = Path(str(result.get("model_path", "")))
+ if not source_path.exists():
+ # try relative-to-cwd
+ rel = Path.cwd() / source_path
+ if rel.exists():
+ source_path = rel
+ else:
+ return None
+ forecast_run_id = str(result.get("run_id", ""))
+ artifact_uri = f"backfill/{model_type}-{source_path.stem}.joblib"
+ dest = registry_root / artifact_uri
+ dest.parent.mkdir(parents=True, exist_ok=True)
+ if not dest.exists():
+ shutil.copy2(source_path, dest)
+ raw = dest.read_bytes()
+ artifact_hash = hashlib.sha256(raw).hexdigest()
</code_context>
<issue_to_address>
**issue (bug_risk):** Handling of missing or non-file `model_path` values can misbehave when the value is empty or points to a directory.
If `model_path` is missing or empty, `Path("")` resolves to the current directory and `exists()` is True, so `shutil.copy2` will be called on a directory and fail. The same applies if `model_path` points to an existing directory. Consider requiring a non-empty path and checking `source_path.is_file()` (and likewise for the `rel` candidate) before copying; otherwise, skip this entry.
</issue_to_address>
### Comment 2
<location path="scripts/seed_registry_from_jobs.py" line_range="120-129" />
<code_context>
+ artifact_hash = hashlib.sha256(raw).hexdigest()
+
+ # (a) create
+ r = await client.post(
+ "/registry/runs",
+ json={
+ "model_type": model_type,
+ "model_config": _model_config_payload(model_type),
+ "feature_config": None,
+ "data_window_start": str(params.get("start_date")),
+ "data_window_end": str(params.get("end_date")),
+ "store_id": int(params["store_id"]),
+ "product_id": int(params["product_id"]),
+ "agent_context": None,
+ "git_sha": None,
+ },
+ )
+ if r.status_code >= 400:
+ # duplicate config_hash → idempotent skip
+ return None
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Treating all non-2xx `/registry/runs` responses as duplicate-config skips can hide real errors.
`status_code >= 400` is broader than the duplicate-config case and will also catch unrelated 4xx/5xx errors, which then get treated as idempotent skips. That can hide real failures like registry downtime or validation errors. Please either detect the actual duplicate condition (e.g., specific status code or error payload) or at least log/raise on unexpected 4xx/5xx responses so operational issues are visible.
Suggested implementation:
```python
artifact_hash = hashlib.sha256(raw).hexdigest()
# (a) create
r = await client.post(
"/registry/runs",
json={
"model_type": model_type,
"model_config": _model_config_payload(model_type),
"feature_config": None,
"data_window_start": str(params.get("start_date")),
"data_window_end": str(params.get("end_date")),
"store_id": int(params["store_id"]),
"product_id": int(params["product_id"]),
"agent_context": None,
"git_sha": None,
},
)
if r.status_code == 409:
# duplicate config_hash → idempotent skip
return None
if r.status_code >= 400:
# non-duplicate 4xx/5xx should surface as errors
try:
error_detail = r.json()
except Exception:
error_detail = r.text
raise RuntimeError(
f"Failed to create registry run (status {r.status_code}): {error_detail}"
)
model_type = str(params.get("model_type", ""))
```
If your registry API uses a different status code or response shape to indicate "duplicate config" (e.g. 422 with a specific error code in the JSON body), update the `if r.status_code == 409:` condition to match that contract, or refine the check using `error_detail`. Also ensure that `client`, `_model_config_payload`, and `params` are in scope in this function as expected.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Three corrections to register_one and fetch_completed_train_jobs:
* pagination — `page * len(jobs) >= total` stops too early when the
last page is partial. Switch to accumulated-count + short-page
detection (exit when len(jobs) < page_size or len(out) >= total).
* model_path validation — empty / directory paths slipped through
because Path("") resolves to cwd and Path.exists() returns True for
directories. Require non-empty path and Path.is_file() for both the
raw and cwd-relative candidates.
* duplicate detection — `r.status_code >= 400` blanket-swallowed
registry downtime and validation errors as idempotent skips. Narrow
the skip to HTTP 409 (the actual DuplicateRunError code per
registry/routes.py:113) and raise RuntimeError on other 4xx / 5xx
with the response body for diagnostics.
Python 3.12-only `def chunked[U](...)` syntax in seed_phase2_only.py
is intentional — `pyproject.toml:6` already pins `requires-python =
">=3.12"`.
Summary
Bundles three carryover concerns from prior local demo work into one PR. Tracked by #297.
app/shared/seeder/generators/facts.py:PriceHistoryGeneratorcould emit a row withvalid_to < valid_fromwhen a price change rolled on the window's first day. That violatesck_price_history_valid_datesand crashes ingest. The fix skips the degenerate row.scripts/seed_phase2_only.py— re-runs Phase 2 generators (replenishment, exogenous, returns, lifecycle) against existing dimensions; refuses unlessDATABASE_URLresolves tolocalhost/127.0.0.1scripts/seed_historical_activity.py— submits a spread of train/predict/backtest jobs across 2024-Q4 → 2026-Q1 cutoffs via/jobsso the Registry / Jobs / Forecasts dashboards have meaningful contentscripts/seed_registry_from_jobs.py— walks completed train jobs, runs the canonical/registry/runspending → running → successtransition + alias stamping with deterministic stub metricsuv.lockbumpsforecastlabai0.2.18 → 0.2.19 to match the already-merged release-please version bump (mirrors prior#239).Excluded (intentionally)
app/features/rag/models.py(modified locally) +alembic/versions/a2b3c4d5e6f7_rag_embedding_dim_2560_qwen3.py(untracked) — the migration's own docstring marks it "Local-only demo migration". ItTRUNCATEsdocument_chunk, dropsix_chunk_embedding_hnsw, and hardcodesvector(2560)for qwen3-embedding:4b. Shipping it todev/mainwould wipe any non-qwen3 user's RAG corpus on nextalembic upgrade head(settings default is still1536). Stays uncommitted locally. If upstream qwen3 support is wanted, it gets its own PRP (target-dim from settings, non-destructive upgrade path, HNSW → IVFFlat fallback).Validation
uv run ruff check <touched files>uv run ruff format --check <touched files>uv run mypy app/shared/seeder/generators/facts.pymypy app/does not coverscripts/)uv run pytest -v -m \"not integration\" app/shared/seeder/ -k \"facts or phase1_regression or price_history\"Not run:
mypy app/andpyright app/— CI will run these on push (they only coverapp/, not the 3 newscripts/)-m integration) — the seeder fix has unit coverage already; integration tests for the new scripts would need a live API + seeded DBWhy one PR
Per the answer to the user's clarifying question. Alternative was three tiny PRs (seeder fix / uv.lock / scripts) — same content, more churn. Multi-scope commit
data,repois allowed by.claude/rules/commit-format.mdfor cross-cutting work.Test plan
facts.pydevgates (Lint & Format, Type Check, Test, Migration Check)uv run python scripts/seed_phase2_only.py --seed 42against a freshly seeded local DBSummary by Sourcery
Fix price history seeding and add local demo seeding scripts, along with a dependency lockfile bump.
New Features:
Bug Fixes:
Build: