End-to-end pipeline orchestrator#29
Open
smodee wants to merge 9 commits into
Open
Conversation
bioscancast_questions.csv stores created_date as an Excel serial day number (e.g. 45712). pd.to_datetime without unit=D + origin=1899-12-30 treated those integers as nanoseconds past 1970, yielding garbage dates like 1970-01-01 00:00:00.000045712. The bug was latent — no caller had yet relied on the parsed date — but the new orchestrator's build_forecast_question factory needs an accurate created_at. After the fix, q7 resolves to 2025-02-24 as expected.
The orchestrator (next commit) needs to turn a CSV row into a typed ForecastQuestion. Maps: - created_date -> tz-aware UTC datetime (already parsed by load_questions) - topic "Pathogen (Region)" -> lowercased pathogen + region - question_text "by Month day, year" -> target_date via regex; falls back to "by Month year" giving the first of next month - question_type + keyword hints in text -> event_type (case_count / death_count / outbreak_declared / None) - resolution_criteria passes through - as_of_date is a factory kwarg, not a CSV column; orchestrator passes it from --as-of-date Tested against all 11 rows of bioscancast_questions.csv; q7 produces ForecastQuestion(id=q7, pathogen=mpox, region=world, target_date=2025-02-28, event_type=case_count, ...).
Branch-local question fixture for the new end-to-end orchestrator's live smoke tests. Two rows: - q7: verbatim copy of the row in bioscancast/stages/eval_stage/ bioscancast_questions.csv. Resolved at 126,441 mpox cases globally by Feb 28 2025. Run with --as-of-date 2025-02-28 to exercise historical replay. - q12: new live question on the current East Africa Ebola outbreak, target_date 2026-06-30. Run with no --as-of-date for live mode. Kept separate from bioscancast_questions.csv so the canonical CSV stays an unmodified record of what human forecasters actually evaluated.
bioscancast/llm/pricing.py introduces: - MODEL_PRICES: USD/1M-token snapshot dated 2026-05-27 for the models actually used by stage configs (gpt-4o-mini, gpt-4o, text-embedding-3- small/large) plus a date-pinned gpt-4o-2024-08-06 alias. - estimate_cost(model, input_tokens, output_tokens, cached_input_tokens): computes USD spend with a 50% discount on cached prefix per OpenAI's standard prompt-cache pricing. - estimate_cost_from_summary(): consumes the dict shape that InsightRunResult.budget_summary already produces. Sources cited in the module docstring. Unknown model raises UnknownModelError so the orchestrator surfaces stale price tables loudly rather than under-reporting cost.
New module with the run-directory layout
(data/runs/{question_id}/{run_id}/) and per-stage JSON dump helpers:
save_question / save_search / save_filtered / save_documents /
save_insight / save_manifest. _json_default and the asdict pattern are
lifted from scripts/eval_insight_on_real_docs.py so the orchestrator
and the eval harness share serialization conventions.
Replaces the 14-line commented sketch with a real argparse-driven
orchestrator that chains all four stages for a single ForecastQuestion:
python -m bioscancast.main q7 --as-of-date 2025-02-28 -v
Pipeline:
1. load_question_by_id reads the CSV row and builds a ForecastQuestion
via the new factory (applying any CLI overrides).
2. SearchStagePipeline runs with a usage-tracking LLM wrapper so
search + filter token usage is accumulated for cost reporting.
3. FilteringPipeline reuses the same wrapped client.
4. ExtractionPipeline gets as_of_date=question.as_of_date so the
fetcher uses Wayback snapshots in historical-replay mode.
5. InsightPipeline receives the raw (unwrapped) client; its
BudgetTracker already tracks usage, so wrapping would double-count.
After all stages, search/filter usage and insight per_model usage are
merged and fed to bioscancast.llm.pricing.estimate_cost for a single
USD figure in the final epilogue and manifest.
Persistence:
data/runs/{question_id}/{run_id}/
question.json, search.json, filtered.json, documents.json,
insight.json, manifest.json
The manifest is rewritten after every stage so a crashed run keeps
partial timings + config. On any stage exception the manifest pins
the failing stage and re-raises wrapped in PipelineError; main()
exits 1.
Empty intermediate output is not an error - logged and passed through
(the insight stage already handles zero documents).
Thin scripts/run_pipeline.py wrapper around bioscancast.main:main so the
new orchestrator follows the same `scripts/run_*.py` convention as the
existing per-stage runners. Both invocations are equivalent:
python -m bioscancast.main q7
python scripts/run_pipeline.py q7
data/runs/ added to .gitignore so per-run artifacts (some quite large -
documents.json includes every chunk text) don't pollute the repo.
Two small fixes uncovered by the first live runs of the orchestrator: 1. persistence._json_default crashed on FILTER_CONFIG's set values (blocked_domains, low_value_url_keywords, etc.) when serializing the manifest. Now sorts sets to lists. 2. pricing.MODEL_PRICES needs the dated aliases OpenAI returns in response.model. A request to "gpt-4o-mini" comes back tagged "gpt-4o-mini-2024-07-18", which was missing from the table and produced a $0 cost estimate with a noisy warning. Added that alias plus a couple of known gpt-4o dated variants. q7 historical-replay run subsequently cost $0.0030, q12 live run cost $0.0049, both correctly reported in the manifest.
The epilogue previously printed a single total-cost figure. This splits cost by stage (search / filter / insight) so a cost spike in any one stage is visible at a glance during iteration. Mechanism: _UsageTrackingClient gains a snapshot() method; run_pipeline snapshots the shared tracker after search and after filter and diffs them (_usage_delta) to attribute usage to each stage. Insight reports its own budget_summary per_model as before. The extract stage makes no LLM calls, so it shows timing only. Manifest gains stage_usage and stage_costs_usd alongside the existing combined_usage and estimated_cost_usd. Epilogue now reads e.g.: search 23.87s $0.0009 filter 3.06s $0.0003 extract 59.69s insight 12.28s $0.0030 total cost: $0.0042 Implements item 11 from the roadmap. 447 tests still passing.
This was referenced Jun 3, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
End-to-end pipeline orchestrator
Summary
Adds the first orchestrator that chains all four pipeline stages
(search → filter → extract → insight) for a single
ForecastQuestion.Before this branch the stages only ran in isolation:
bioscancast/main.pywasa commented sketch,
scripts/run_insight.pywas synthetic-only, andscripts/eval_insight_on_real_docs.pychained only extract→insight onhardcoded fixtures. Forecasting (the next stage) needs a real
InsightRecordstream; without this orchestrator it would have had to build the chain itself,
coupling the two stages.
What's included
bioscancast/main.py—run_pipeline()+ argparse CLI. Chains the fourstages with per-stage timing, JSON persistence, error wrapping
(
PipelineErrorpins the failing stage), and a cost estimate. Runnable aspython -m bioscancast.main q7 …or viascripts/run_pipeline.py.bioscancast/orchestration/— new package:persistence.py(run-dirlayout
data/runs/{qid}/{run_id}/, per-stage JSON dumps, manifest) andtest_questions.csv(q7 verbatim copy + a new q12 Ebola question; thecanonical
bioscancast_questions.csvis left untouched as the human-forecaster record).
bioscancast/stages/eval_stage/loaders.py—build_forecast_questionfactory (CSV row →
ForecastQuestion) +load_question_by_id; fixed anExcel-serial
created_dateparsing bug while here.bioscancast/llm/pricing.py— model price table (snapshot 2026-05-27) +estimate_cost; surfaces USD cost per run, broken down per stage in theorchestrator epilogue.
Verification
python -m pytest bioscancast/tests/— 447 passed, 2 skipped (live).Tests cover the orchestrator's stage timing/persistence, the
build_forecast_questionfactory, and the per-stage cost line-items.live) end-to-end runs producing artifacts inspected for filter survival,
records, and cost. Cumulative API spend across all development runs:
~$0.03.
Not in scope
belong in forecasting, not insight.