diff --git a/justfile b/justfile index 71d16116..1d0063d8 100644 --- a/justfile +++ b/justfile @@ -183,6 +183,30 @@ research-community provider target *args="": # Alias for repo-specific entity research. research-entity provider target *args="": (research-community provider target args) +# Edison Scientific deep research (PaperQA3) for one community record. +# target = filename stem, CommunityMech id, or YAML path. +# Examples: +# just research-community-edison Yogurt_TwoSpecies_Starter_Culture --dry-run +# just research-community-edison CommunityMech:000164 --job literature-high +research-community-edison target *args="": + uv run --extra dev python scripts/research_community_edison.py \ + --target {{target}} \ + --template {{templates_dir}}/community_mechanism_research.md \ + --out-dir {{research_dir}}/communities \ + {{args}} + +# Edison deep research for a batch of communities (JSON list of stems/ids/paths). +research-community-edison-batch batch *args="": + uv run --extra dev python scripts/research_community_edison.py \ + --batch {{batch}} \ + --template {{templates_dir}}/community_mechanism_research.md \ + --out-dir {{research_dir}}/communities \ + {{args}} + +# Retroactively backfill Edison provenance sidecars (no re-billing). +enrich-edison-response *args="": + uv run --extra dev python scripts/enrich_edison_response.py {{args}} + # List available deep-research-client providers. research-providers: #!/usr/bin/env bash diff --git a/pyproject.toml b/pyproject.toml index 7b4b68ea..90887445 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,10 @@ dev = [ "mypy>=1.0.0", "pre-commit>=3.0.0", "deep-research-client[cyberian]>=0.2.4; python_version >= '3.12'", + # Used by scripts/research_community_edison.py — drives the Edison + # Scientific SDK directly (PaperQA is not a registered DRC provider). + "edison-client>=0.11.0; python_version >= '3.12'", + "python-dotenv>=1.0.0", "types-PyYAML>=6.0.0", "types-requests>=2.31.0", "types-tqdm>=4.66.0", diff --git a/scripts/_edison_capture.py b/scripts/_edison_capture.py new file mode 100644 index 00000000..91aa98af --- /dev/null +++ b/scripts/_edison_capture.py @@ -0,0 +1,503 @@ +"""Shared Edison-response capture helpers for the deep-research scripts. + +Goal: when we spend credits on an Edison job, we want to save every +useful piece of the response — answer, formatted answer, reasoning, +agent-state trace, citations, generated files, and the full raw +response — so the work is reusable for provenance, audit, debugging, +and downstream curation without re-querying. + +Files written per task (under ``out_dir``): + + {stem}.md Primary human-readable answer + (formatted_answer if present, else answer). + {stem}-meta.yaml Compact provenance summary (task_id, + status, costs, query, template_vars, ...). + Always written. + {stem}-response.json Full ``response.model_dump(mode='json')``. + Every SDK-exposed field, future-proof + against new ones. + {stem}-citations.md Parsed reference list from + ``formatted_answer`` (PaperQA convention, + matches the falcon citations.md sidecar). + {stem}-agent-state.json ``agent_state`` (tool-call trace) + + ``environment_frame`` + verbose + ``metadata``. Only written when verbose + fetch yields any of those fields. + {stem}-files.json ``client.list_files(trajectory_id)`` + output — artifacts Edison may have + generated during the run. Empty list + written when none are present, so the + existence of the file means "we asked". + +where ``{stem}`` is the slug the caller chose (e.g. +``archaeoglobus_medium_dsm_399-edison-literature``). + +The ``capture_full_response`` entry point handles all of the above and +returns the meta dict the caller should also yaml-dump as +``{stem}-meta.yaml``. Idempotent: re-invoking overwrites; existing +sibling files are not read. +""" +from __future__ import annotations + +import hashlib +import json +import re +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + + +# -- citation extraction -------------------------------------------------- + +_NUMBERED_REF_RE = re.compile(r"^\s*(\d+)\.\s+(.+?)\s*$", re.MULTILINE) +_URL_RE = re.compile(r"https?://[^\s<>)\]]+", re.IGNORECASE) +_PMID_RE = re.compile(r"\bPMID[:\s]+(\d+)", re.IGNORECASE) +_DOI_RE = re.compile(r"\b(?:doi[:\s]+|https?://(?:dx\.)?doi\.org/)(10\.[^\s<>)\]]+)", + re.IGNORECASE) + + +def parse_citations(formatted_answer: str | None) -> list[dict[str, Any]]: + """Best-effort citation extraction from a PaperQA ``formatted_answer``. + + PaperQA's answer text typically ends with a ``References`` section + using either a numbered list (``1. Author2020paper pages 1-3``) or + a free-text block. This pulls both numbered entries and any + PMID/DOI/URL hits, deduping by reference text and identifier. + + Returns a list of dicts with optional keys: ``ordinal``, ``text``, + ``pmid``, ``doi``, ``urls``. Each entry corresponds to one + distinct citation. Empty list when the input is empty or no + references parse out — callers should still write the file so + "we parsed but found nothing" is distinguishable from "we never + tried". + """ + if not formatted_answer: + return [] + + # PaperQA convention: References section starts at "References" or + # the first numbered line. Look for the last occurrence of the + # word "References" (case-insensitive) and treat everything after + # it as the reference block; fall back to scanning the full text. + text = formatted_answer + m = re.search(r"\n\s*references\s*\n", text, re.IGNORECASE) + block = text[m.end():] if m else text + + out: list[dict[str, Any]] = [] + seen_keys: set[str] = set() + for match in _NUMBERED_REF_RE.finditer(block): + ordinal = int(match.group(1)) + body = match.group(2).strip() + if not body: + continue + entry: dict[str, Any] = {"ordinal": ordinal, "text": body} + pmid_m = _PMID_RE.search(body) + if pmid_m: + entry["pmid"] = pmid_m.group(1) + doi_m = _DOI_RE.search(body) + if doi_m: + entry["doi"] = doi_m.group(1).rstrip(".,;)") + urls = _URL_RE.findall(body) + if urls: + entry["urls"] = [u.rstrip(".,;)") for u in urls] + key = entry.get("pmid") or entry.get("doi") or body + if key in seen_keys: + continue + seen_keys.add(key) + out.append(entry) + + # If the numbered scan caught nothing, fall back to whatever URLs + # appear in the answer body — better something than nothing. + if not out: + for url in dict.fromkeys(_URL_RE.findall(formatted_answer)): + key = url.rstrip(".,;)") + if key in seen_keys: + continue + seen_keys.add(key) + out.append({"text": url, "urls": [key]}) + return out + + +def render_citations_md(citations: list[dict[str, Any]], + query: str | None = None) -> str: + """Render the parsed citations as a human-readable markdown sidecar.""" + lines: list[str] = ["# Citations", ""] + if query: + lines.append(f"_From a query of {len(query)} chars (see `*-meta.yaml`)._") + lines.append("") + if not citations: + lines.append("_No citations parsed._") + return "\n".join(lines) + "\n" + for c in citations: + bits: list[str] = [] + if "ordinal" in c: + bits.append(f"**{c['ordinal']}.**") + bits.append(c.get("text", "(no text)")) + ids: list[str] = [] + if "pmid" in c: + ids.append(f"PMID:{c['pmid']}") + if "doi" in c: + ids.append(f"doi:{c['doi']}") + if ids: + bits.append("(" + ", ".join(ids) + ")") + lines.append("- " + " ".join(bits)) + for url in c.get("urls", []): + lines.append(f" - <{url}>") + return "\n".join(lines) + "\n" + + +# -- helpers -------------------------------------------------------------- + +def query_sha256(query: str) -> str: + """Stable hash of the rendered query — useful for dedup / cache keys.""" + return hashlib.sha256(query.encode("utf-8")).hexdigest() + + +def _safe_model_dump(obj: Any) -> Any: + """Best-effort serialization of a pydantic response (or anything). + + Returns the JSON-mode model dump when ``obj`` is a pydantic model; + falls back to ``__dict__`` or repr. + """ + if obj is None: + return None + if hasattr(obj, "model_dump"): + try: + return obj.model_dump(mode="json") + except Exception: # pylint: disable=broad-except + try: + return obj.model_dump() + except Exception: + pass + if hasattr(obj, "__dict__"): + return {k: _safe_model_dump(v) for k, v in obj.__dict__.items() + if not k.startswith("_")} + if isinstance(obj, (list, tuple)): + return [_safe_model_dump(v) for v in obj] + if isinstance(obj, dict): + return {k: _safe_model_dump(v) for k, v in obj.items()} + try: + json.dumps(obj) + return obj + except (TypeError, ValueError): + return repr(obj) + + +def _get_attr(response: Any, name: str, default: Any = None) -> Any: + return getattr(response, name, default) + + +def _fetch_verbose(client: Any, task_id: str) -> Any | None: + """Pull the verbose response for a completed task. + + No new compute on the Edison side — this is a metadata fetch from + cached task results. Returns ``None`` and swallows errors so a + verbose-fetch failure never breaks the primary capture path. + """ + if not client or not task_id: + return None + try: + return client.get_task(task_id=task_id, verbose=True) + except Exception: # pylint: disable=broad-except + return None + + +def _try_list_files(client: Any, task_id: str) -> Any: + """Pull the artifact-file list for a task. Returns None on failure.""" + if not client or not task_id: + return None + try: + return client.list_files(trajectory_id=task_id) + except Exception: # pylint: disable=broad-except + return None + + +# Internal PaperQA artifacts we never want to download (giant pickles +# carrying the indexed corpus — useful to PaperQA, not to curators). +_INTERNAL_ARTIFACT_PREFIXES = ("pqa:",) + +# Hard cap on per-artifact size; protects against accidentally pulling +# multi-GB binaries. Curation tables are typically < 100 KB. +_DEFAULT_ARTIFACT_MAX_BYTES = 2_000_000 + + +def _safe_artifact_name(raw_name: str) -> str: + """Sanitize an Edison data-storage name for use as a filename.""" + safe = "".join(c if c.isalnum() or c in "._- " else "_" + for c in raw_name).strip().rstrip(".") + safe = "_".join(safe.split()) # collapse whitespace + return safe[:120] or "artifact" + + +def _fetch_artifact_content(client: Any, data_storage_id: str) -> tuple[str, str | bytes] | None: + """Fetch one artifact's content. Returns (ext, content) or None on failure. + + The SDK may return either a ``Path`` (GCS-backed, possibly extracted + zip) or a ``RawFetchResponse``-like object with a ``.content`` + attribute. Both are handled — we always write the raw payload as + text when possible, falling back to bytes. + """ + try: + result = client.fetch_data_from_storage(data_storage_id=data_storage_id) + except Exception: # pylint: disable=broad-except + return None + if result is None: + return None + # RawFetchResponse-ish: has .content + if hasattr(result, "content"): + content = getattr(result, "content") + if isinstance(content, bytes): + try: + return ("txt", content.decode("utf-8")) + except UnicodeDecodeError: + return ("bin", content) + return ("txt", str(content)) + # Path-ish (single file): read it + if hasattr(result, "read_text"): + try: + return ("txt", result.read_text()) + except (UnicodeDecodeError, OSError): + try: + return ("bin", result.read_bytes()) + except OSError: + return None + # list[Path]: write index plus contents in order + if isinstance(result, list): + parts: list[str] = [] + for i, p in enumerate(result): + if hasattr(p, "read_text"): + try: + parts.append(f"# file {i}: {p.name if hasattr(p, 'name') else p}\n" + + p.read_text()) + except Exception: # pylint: disable=broad-except + parts.append(f"# file {i}: ") + return ("txt", "\n\n".join(parts)) + return None + + +def fetch_named_artifacts( + *, + client: Any, + files_listing: Any, + out_dir: Path, + stem: str, + max_bytes: int = _DEFAULT_ARTIFACT_MAX_BYTES, +) -> list[dict[str, Any]]: + """Download named curation artifacts referenced by ``list_files``. + + Writes per-artifact files under ``{stem}-artifacts/`` and returns a + manifest summarizing what was fetched, skipped, or failed. The + manifest is intentionally compact — full per-artifact metadata is + already in ``{stem}-files.json``. + + Skips: + - PaperQA internal artifacts (name prefix ``pqa:`` — large pickles). + - Artifacts whose declared size exceeds ``max_bytes``. + + Idempotent: an existing file in the artifacts dir is overwritten + (Edison artifacts are immutable per task, so re-fetching is fine). + """ + manifest: list[dict[str, Any]] = [] + if files_listing is None or client is None: + return manifest + + # Normalize: list_files may return a dict with 'data', a list, or a + # pydantic-y object — pull the iterable of file entries. + entries: list[Any] + if isinstance(files_listing, dict) and isinstance(files_listing.get("data"), list): + entries = files_listing["data"] + elif isinstance(files_listing, list): + entries = files_listing + else: + return manifest + + artifacts_dir = out_dir / f"{stem}-artifacts" + fetched_any = False + for entry in entries: + ds = entry.get("data_storage") if isinstance(entry, dict) else None + if not isinstance(ds, dict): + continue + ds_id = ds.get("id") or entry.get("data_storage_id") + name = ds.get("name") or "artifact" + if not ds_id: + continue + # Skip internal PaperQA cache + if any(name.lower().startswith(p) for p in _INTERNAL_ARTIFACT_PREFIXES): + manifest.append({"name": name, "id": str(ds_id), "status": "skipped-internal"}) + continue + size = (ds.get("metadata") or {}).get("size") + if isinstance(size, int) and size > max_bytes: + manifest.append({"name": name, "id": str(ds_id), + "status": "skipped-too-large", "size": size}) + continue + fetched = _fetch_artifact_content(client, str(ds_id)) + if fetched is None: + manifest.append({"name": name, "id": str(ds_id), "status": "fetch-failed"}) + continue + ext, content = fetched + safe = _safe_artifact_name(name) + out_path = artifacts_dir / f"{safe}.{ext}" + if not fetched_any: + artifacts_dir.mkdir(parents=True, exist_ok=True) + fetched_any = True + if isinstance(content, bytes): + out_path.write_bytes(content) + else: + out_path.write_text(content) + manifest.append({"name": name, "id": str(ds_id), "status": "fetched", + "path": str(out_path.relative_to(out_dir)), + "chars": len(content) if isinstance(content, str) else None}) + return manifest + + +# -- main entry point ----------------------------------------------------- + +def capture_full_response( + *, + response: Any, + client: Any, + out_dir: Path, + stem: str, + query: str, + base_meta: dict[str, Any], +) -> dict[str, Any]: + """Persist every reusable piece of an Edison response. + + Writes the sibling files documented at the top of this module and + returns the final meta dict the caller should yaml-dump as + ``{stem}-meta.yaml``. The caller still owns writing the meta yaml + so that the meta file format stays in one place (yaml.safe_dump + knobs may vary). + + ``response`` is the SDK response object (``run_tasks_until_done`` + returns a list; pass one element here). + + ``client`` is the live ``EdisonClient`` instance. Pass ``None`` to + skip the verbose-and-files fetch (e.g. in dry-run modes). + + ``base_meta`` is the caller's pre-built meta dict (slug, media + path, template path, etc.); fields are merged with the new + response-derived fields, with response-derived fields winning. + """ + out_dir.mkdir(parents=True, exist_ok=True) + + md_path = out_dir / f"{stem}.md" + response_json_path = out_dir / f"{stem}-response.json" + citations_md_path = out_dir / f"{stem}-citations.md" + agent_state_path = out_dir / f"{stem}-agent-state.json" + files_path = out_dir / f"{stem}-files.json" + + # Primary answer markdown + answer = _get_attr(response, "answer") + formatted_answer = _get_attr(response, "formatted_answer") + answer_reasoning = _get_attr(response, "answer_reasoning") + body = formatted_answer or answer or "(no answer field on this job's response type)" + md_path.write_text(body) + + # Full raw response dump (every SDK field, future-proof) + response_dump = _safe_model_dump(response) + response_json_path.write_text(json.dumps(response_dump, indent=2, default=str)) + + # Citations sidecar + citations = parse_citations(formatted_answer or answer) + citations_md_path.write_text(render_citations_md(citations, query=query)) + + # Verbose + files via secondary fetches (no extra billing) + task_id = str(_get_attr(response, "task_id") or "") + verbose = _fetch_verbose(client, task_id) if task_id else None + if verbose is not None: + agent_state = _safe_model_dump(_get_attr(verbose, "agent_state")) + environment_frame = _safe_model_dump(_get_attr(verbose, "environment_frame")) + verbose_metadata = _safe_model_dump(_get_attr(verbose, "metadata")) + if any(x is not None for x in (agent_state, environment_frame, verbose_metadata)): + agent_state_path.write_text(json.dumps({ + "task_id": task_id, + "agent_state": agent_state, + "environment_frame": environment_frame, + "metadata": verbose_metadata, + }, indent=2, default=str)) + + files_listing = _try_list_files(client, task_id) if task_id else None + artifacts_manifest: list[dict[str, Any]] = [] + if files_listing is not None: + files_path.write_text(json.dumps(_safe_model_dump(files_listing), + indent=2, default=str)) + # Pull the actual content of named curation artifacts (small, + # not internal PaperQA pickles) into a sibling artifacts/ dir. + artifacts_manifest = fetch_named_artifacts( + client=client, + files_listing=_safe_model_dump(files_listing), + out_dir=out_dir, + stem=stem, + ) + + # Build the meta dict the caller will yaml-dump + meta: dict[str, Any] = dict(base_meta) + meta.update({ + "task_id": task_id, + "status": _get_attr(response, "status"), + "job_name": _get_attr(response, "job_name"), + "user": _get_attr(response, "user"), + "agent_name": _get_attr(response, "agent_name"), + "build_owner": _get_attr(response, "build_owner"), + "environment_name": _get_attr(response, "environment_name"), + "project_id": str(_get_attr(response, "project_id") or "") or None, + "share_status": _get_attr(response, "share_status"), + "created_at": _to_iso(_get_attr(response, "created_at")), + "answer_received_at": datetime.now(timezone.utc).isoformat(), + "total_cost": _get_attr(response, "total_cost"), + "total_queries": _get_attr(response, "total_queries"), + "has_successful_answer": _get_attr(response, "has_successful_answer"), + "has_answer_reasoning": bool(answer_reasoning), + "answer_chars": len(answer or ""), + "formatted_answer_chars": len(formatted_answer or ""), + "answer_reasoning_chars": len(answer_reasoning or ""), + "citations_parsed": len(citations), + "query_sha256": query_sha256(query), + "sidecar_files": _existing_sidecars(out_dir, stem), + "artifacts_fetched": [a for a in artifacts_manifest if a.get("status") == "fetched"], + "artifacts_skipped": [a for a in artifacts_manifest if a.get("status") != "fetched"], + }) + return meta + + +def capture_dry_run( + *, + out_dir: Path, + stem: str, + query: str, + base_meta: dict[str, Any], +) -> dict[str, Any]: + """Dry-run capture: write meta-only artifacts; never write the .md. + + Used by the ``--dry-run`` flag in both edison scripts so prompts + are auditable without spending credits. + """ + out_dir.mkdir(parents=True, exist_ok=True) + meta: dict[str, Any] = dict(base_meta) + meta.update({ + "status": "dry-run", + "query_chars": len(query), + "query": query, + "query_sha256": query_sha256(query), + }) + return meta + + +def _existing_sidecars(out_dir: Path, stem: str) -> dict[str, bool]: + """Snapshot which sidecar files exist for this stem — for the meta.""" + return { + "answer_md": (out_dir / f"{stem}.md").exists(), + "response_json": (out_dir / f"{stem}-response.json").exists(), + "citations_md": (out_dir / f"{stem}-citations.md").exists(), + "agent_state_json": (out_dir / f"{stem}-agent-state.json").exists(), + "files_json": (out_dir / f"{stem}-files.json").exists(), + } + + +def _to_iso(dt: Any) -> str | None: + """ISO-format a datetime or return the str it already is.""" + if dt is None: + return None + if hasattr(dt, "isoformat"): + return dt.isoformat() + return str(dt) diff --git a/scripts/enrich_edison_response.py b/scripts/enrich_edison_response.py new file mode 100644 index 00000000..b32535e3 --- /dev/null +++ b/scripts/enrich_edison_response.py @@ -0,0 +1,307 @@ +#!/usr/bin/env python3 +"""Retroactively enrich Edison research outputs with full provenance. + +For each ``research/communities/*-edison-*-meta.yaml`` carrying a real +``task_id`` (i.e. status != "dry-run"), pull the verbose response via +``client.get_task(task_id, verbose=True)`` and any artifact-file +listing via ``client.list_files(trajectory_id)`` and write the +missing sidecar files: + + {stem}-response.json full response.model_dump + {stem}-citations.md parsed reference list + {stem}-agent-state.json tool-call trace + env frame + metadata + {stem}-files.json artifact-file inventory + +These secondary calls are **metadata-only** — no Edison compute is +re-billed; Edison stores task results and serves them back. + +Use cases: + - You ran phase-1 with an older script that only saved the answer + md + a sparse meta; this backfills everything you originally + missed. + - A live run partially completed and you want to confirm exactly + what came back without re-running. + - You want the full agent-state trace for debugging a surprising + answer. + +Usage:: + + # Enrich every meta yaml that's missing sidecars + python scripts/enrich_edison_response.py + + # Limit to a specific community / pattern + python scripts/enrich_edison_response.py --pattern 'Yogurt_*' + + # Force re-write of sidecars even if they already exist + python scripts/enrich_edison_response.py --force + + # See what would happen without making any API calls + python scripts/enrich_edison_response.py --dry-run +""" +from __future__ import annotations + +import argparse +import json +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import yaml + +REPO_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(REPO_ROOT / "scripts")) +import _edison_capture as ec # noqa: E402 +import research_community_edison as rce # noqa: E402 -- reuse load_api_key + +DEFAULT_RESEARCH_DIR = REPO_ROOT / "research" / "communities" + + +def _strip_meta_suffix(name: str) -> str: + """Drop the ``-meta.yaml`` tail to get the stem the capture helper used.""" + if name.endswith("-meta.yaml"): + return name[: -len("-meta.yaml")] + if name.endswith(".yaml"): + return name[: -len(".yaml")] + return name + + +def needs_enrichment(out_dir: Path, stem: str, *, force: bool) -> dict[str, bool]: + """Return a dict of which sidecars are missing for this stem.""" + return { + "response_json": force or not (out_dir / f"{stem}-response.json").exists(), + "citations_md": force or not (out_dir / f"{stem}-citations.md").exists(), + "agent_state_json": force or not (out_dir / f"{stem}-agent-state.json").exists(), + "files_json": force or not (out_dir / f"{stem}-files.json").exists(), + "answer_md": force or not (out_dir / f"{stem}.md").exists(), + } + + +def enrich_one(client: Any, meta_path: Path, *, force: bool, dry_run: bool) -> dict[str, Any]: + """Pull verbose + files for one meta. Returns a stats dict.""" + meta = yaml.safe_load(meta_path.read_text()) + if not isinstance(meta, dict): + return {"path": str(meta_path), "status": "invalid-meta", "wrote": []} + + status = str(meta.get("status") or "").lower() + task_id = str(meta.get("task_id") or "") + if status == "dry-run" or not task_id: + return {"path": str(meta_path), "status": "skipped-no-task-id", "wrote": []} + + stem = _strip_meta_suffix(meta_path.name) + out_dir = meta_path.parent + missing = needs_enrichment(out_dir, stem, force=force) + if not any(missing.values()): + return {"path": str(meta_path), "status": "already-complete", "wrote": []} + + print( + f" + enriching {stem} (missing: " f"{[k for k, v in missing.items() if v]})", flush=True + ) + + if dry_run: + return { + "path": str(meta_path), + "status": "dry-run", + "would_write": [k for k, v in missing.items() if v], + } + + wrote: list[str] = [] + + # Verbose fetch covers: response.json (using verbose payload as + # canonical), agent-state.json, and lets us refresh meta fields + # we may have skipped originally (answer_reasoning, etc.). + try: + verbose = client.get_task(task_id=task_id, verbose=True) + except Exception as exc: # pylint: disable=broad-except + print(f" ! verbose fetch failed: {exc}", file=sys.stderr) + verbose = None + + # Always try the non-verbose response too for answer/formatted_answer/cost + # (some response classes carry these only in the non-verbose form). + try: + normal = client.get_task(task_id=task_id, verbose=False) + except Exception as exc: # pylint: disable=broad-except + print(f" ! normal fetch failed: {exc}", file=sys.stderr) + normal = None + + primary = normal or verbose + if primary is None: + return {"path": str(meta_path), "status": "fetch-failed", "wrote": []} + + # Re-derive answer body and write the .md if missing + answer = getattr(primary, "answer", None) + formatted_answer = getattr(primary, "formatted_answer", None) + if missing["answer_md"]: + body = formatted_answer or answer or "(no answer field on this job's response type)" + (out_dir / f"{stem}.md").write_text(body) + wrote.append("answer_md") + + if missing["response_json"]: + merged: dict[str, Any] = {} + if normal is not None: + merged["response"] = ec._safe_model_dump(normal) # pylint: disable=protected-access + if verbose is not None: + merged["verbose"] = ec._safe_model_dump(verbose) # pylint: disable=protected-access + (out_dir / f"{stem}-response.json").write_text(json.dumps(merged, indent=2, default=str)) + wrote.append("response_json") + + if missing["citations_md"]: + citations = ec.parse_citations(formatted_answer or answer) + query = str(meta.get("query") or "") + (out_dir / f"{stem}-citations.md").write_text( + ec.render_citations_md(citations, query=query) + ) + wrote.append("citations_md") + + if missing["agent_state_json"] and verbose is not None: + agent_state = ec._safe_model_dump(getattr(verbose, "agent_state", None)) # pylint: disable=protected-access + environment_frame = ec._safe_model_dump(getattr(verbose, "environment_frame", None)) # pylint: disable=protected-access + verbose_metadata = ec._safe_model_dump(getattr(verbose, "metadata", None)) # pylint: disable=protected-access + if any(x is not None for x in (agent_state, environment_frame, verbose_metadata)): + (out_dir / f"{stem}-agent-state.json").write_text( + json.dumps( + { + "task_id": task_id, + "agent_state": agent_state, + "environment_frame": environment_frame, + "metadata": verbose_metadata, + }, + indent=2, + default=str, + ) + ) + wrote.append("agent_state_json") + + files_listing = None + if missing["files_json"]: + try: + files_listing = client.list_files(trajectory_id=task_id) + except Exception as exc: # pylint: disable=broad-except + print(f" ! list_files failed: {exc}", file=sys.stderr) + files_listing = None + if files_listing is not None: + (out_dir / f"{stem}-files.json").write_text( + json.dumps(ec._safe_model_dump(files_listing), indent=2, default=str) # pylint: disable=protected-access + ) + wrote.append("files_json") + # Pull named curation artifacts (separate from the files.json + # inventory). Always attempt, even when files.json already exists, + # so re-running picks up artifacts that were missed before. + artifacts_manifest: list[dict[str, Any]] = [] + if files_listing is None and (out_dir / f"{stem}-files.json").exists(): + try: + files_listing = json.loads((out_dir / f"{stem}-files.json").read_text()) + except (OSError, json.JSONDecodeError): + files_listing = None + if files_listing is not None: + artifacts_manifest = ec.fetch_named_artifacts( + client=client, + files_listing=ec._safe_model_dump(files_listing), # pylint: disable=protected-access + out_dir=out_dir, + stem=stem, + ) + if any(a.get("status") == "fetched" for a in artifacts_manifest): + wrote.append("artifacts") + + # Refresh the meta yaml with the now-richer field set + updates: dict[str, Any] = { + "answer_chars": len(answer or ""), + "formatted_answer_chars": len(formatted_answer or ""), + "has_answer_reasoning": bool(getattr(primary, "answer_reasoning", None)), + "answer_reasoning_chars": len(getattr(primary, "answer_reasoning", "") or ""), + "citations_parsed": len(ec.parse_citations(formatted_answer or answer)), + "sidecar_files": ec._existing_sidecars(out_dir, stem), # pylint: disable=protected-access + "artifacts_fetched": [a for a in artifacts_manifest if a.get("status") == "fetched"], + "artifacts_skipped": [a for a in artifacts_manifest if a.get("status") != "fetched"], + "enriched_at": ec._to_iso(datetime.now(timezone.utc)), + } + # Preserve existing query_sha256, but stamp it in if missing. + if not meta.get("query_sha256") and meta.get("query"): + updates["query_sha256"] = ec.query_sha256(str(meta["query"])) + # Pull through fields we may not have captured originally. + for field in ( + "job_name", + "user", + "agent_name", + "build_owner", + "environment_name", + "share_status", + ): + val = getattr(primary, field, None) + if val is not None and meta.get(field) is None: + updates[field] = val + created_at = getattr(primary, "created_at", None) + if created_at is not None and not meta.get("created_at"): + updates["created_at"] = ec._to_iso(created_at) # pylint: disable=protected-access + + meta.update(updates) + meta_path.write_text(yaml.safe_dump(meta, sort_keys=False, allow_unicode=True, width=100)) + + return {"path": str(meta_path), "status": "enriched", "wrote": wrote} + + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument( + "--research-dir", + type=Path, + default=DEFAULT_RESEARCH_DIR, + help="Directory containing *-edison-*-meta.yaml files.", + ) + ap.add_argument( + "--pattern", + default="*-edison-*-meta.yaml", + help="Glob pattern for meta yamls to consider.", + ) + ap.add_argument( + "--force", action="store_true", help="Re-write sidecars even if they already exist." + ) + ap.add_argument( + "--dry-run", + action="store_true", + help="Show what would be enriched without making API calls.", + ) + args = ap.parse_args(argv) + + if not args.research_dir.is_dir(): + print(f"Research dir not found: {args.research_dir}", file=sys.stderr) + return 2 + + meta_paths = sorted(args.research_dir.glob(args.pattern)) + if not meta_paths: + print(f"No metas matched {args.pattern} under {args.research_dir}") + return 0 + + try: + shown = args.research_dir.relative_to(REPO_ROOT) + except ValueError: + shown = args.research_dir + print(f"Considering {len(meta_paths)} meta yamls in {shown}/") + + client = None + if not args.dry_run: + api_key = rce.load_api_key() + from edison_client import EdisonClient + + client = EdisonClient(api_key=api_key) + + results: list[dict[str, Any]] = [] + try: + for meta_path in meta_paths: + results.append(enrich_one(client, meta_path, force=args.force, dry_run=args.dry_run)) + finally: + if client is not None: + client.close() + + by_status: dict[str, int] = {} + for r in results: + by_status[r["status"]] = by_status.get(r["status"], 0) + 1 + print() + print("Summary:") + for k, v in sorted(by_status.items()): + print(f" {k:>22}: {v}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/research_community_edison.py b/scripts/research_community_edison.py new file mode 100644 index 00000000..ca97b6a8 --- /dev/null +++ b/scripts/research_community_edison.py @@ -0,0 +1,334 @@ +#!/usr/bin/env python3 +"""Run Edison Scientific deep research against CommunityMech community records. + +Uses the `edison-client` SDK directly. The companion `research_community.py` +wraps `deep-research-client`, but as of DRC 0.2.4 only `cyberian` and +`openai` are registered providers — Edison/PaperQA is not exposed there, +so we drive the SDK directly. This is the CommunityMech port of +CultureMech's `research_media_edison.py`; the response-capture plumbing +(`_edison_capture.py`) is vendored verbatim and shared by both. + +The default job is LITERATURE (== `job-futurehouse-paperqa3`), the +PaperQA agent — the best fit for "what is this community's composition, +what are its ecological interactions and mechanisms, what environment +and conditions support it"-type questions. Use ``--job literature-high`` +for the deeper variant (more reads, higher cost), ``--job precedent`` +for first-mention search, ``--job phoenix`` for synthesis. + +Auth: reads ``EDISON_PLATFORM_API_KEY`` (SDK-native) or ``EDISON_API_KEY`` +(the key already in this repo's ``.env``) from environment. A repo-root +``.env`` is auto-loaded via python-dotenv. + +Outputs land under ``research/communities/{slug}-edison-{job}.md`` +(``slug`` = community YAML stem, ``job`` = lowercase-hyphenated job +name, e.g. ``literature-high``). A sibling ``{slug}-edison-{job}-meta.yaml`` +captures the rendered query text, task_id, total_cost, status, +template_path, and template_vars — sufficient for audit and re-runs. + +Usage:: + + # single record (filename stem, id, or path) + python scripts/research_community_edison.py --target Yogurt_TwoSpecies_Starter_Culture + + # by CommunityMech id + python scripts/research_community_edison.py --target CommunityMech:000164 + + # batch from a JSON list of stems / ids / paths + python scripts/research_community_edison.py --batch queue.json --limit 5 + + # dry-run skips the API call but still writes the meta yaml (including + # the full rendered query) so you can inspect the prompt that would + # have been sent without spending credits. + python scripts/research_community_edison.py --target --dry-run +""" +from __future__ import annotations + +import argparse +import json +import os +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import yaml +from dotenv import load_dotenv + +REPO_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(REPO_ROOT / "scripts")) +import research_community as rc # noqa: E402 -- reuse template_vars + loaders +import _edison_capture as ec # noqa: E402 -- response/citation/agent capture + +DEFAULT_TEMPLATE = REPO_ROOT / "templates" / "community_mechanism_research.md" +DEFAULT_OUT_DIR = REPO_ROOT / "research" / "communities" + + +_JOB_ALIASES: dict[str, str] = { + "literature": "LITERATURE", + "paperqa": "LITERATURE", + "literature-high": "LITERATURE_HIGH", + "literature_high": "LITERATURE_HIGH", + "paperqa-high": "LITERATURE_HIGH", + "precedent": "PRECEDENT", + "phoenix": "PHOENIX", +} + + +def resolve_job(name: str): + """Map a user-friendly --job alias to the edison_client JobNames enum.""" + from edison_client import JobNames + + key = _JOB_ALIASES.get(name.lower()) + if key is None: + raise SystemExit( + f"Unknown --job '{name}'. Choose one of: " + ", ".join(sorted(_JOB_ALIASES)) + ) + return getattr(JobNames, key) + + +def load_api_key() -> str: + """Pick up the Edison key from env (with the legacy alias). + + The SDK natively reads ``EDISON_PLATFORM_API_KEY``; this repo's + ``.env`` sets ``EDISON_API_KEY``. Honor both. + """ + load_dotenv(REPO_ROOT / ".env") + key = os.environ.get("EDISON_PLATFORM_API_KEY") or os.environ.get("EDISON_API_KEY") + if not key: + raise SystemExit( + "EDISON_PLATFORM_API_KEY (or EDISON_API_KEY) is not set. " + "Add it to .env at the repo root, or `export EDISON_PLATFORM_API_KEY=...` " + "in your shell." + ) + return key + + +def render_query( + community_path: Path, template_path: Path, doc: dict[str, Any] | None = None +) -> tuple[str, dict[str, str]]: + """Render the deep-research template for a single community. + + Returns ``(query_text, template_vars)`` so callers can stamp the + variables into the meta file alongside the rendered query. ``doc`` + may be passed in to reuse an already-parsed YAML and avoid a second + read. + """ + if doc is None: + doc = rc.load_community(community_path) + variables = rc.template_vars(doc, community_path) + template = template_path.read_text() + return template.format_map(_DefaultEmpty(variables)), variables + + +class _DefaultEmpty(dict): + """``str.format_map`` helper: leave unknown placeholders blank instead of KeyError.""" + + def __missing__(self, key): # noqa: ANN001 + return "" + + +def slug_for(community_path: Path) -> str: + """Stable filename slug for output naming (the YAML file stem).""" + return community_path.stem + + +def _short_job(job) -> str: + """CLI-friendly filename suffix: ``JobNames.LITERATURE_HIGH`` -> ``literature-high``.""" + return job.name.lower().replace("_", "-") + + +def _display_path(path: Path) -> str: + """Show ``path`` relative to the repo when possible; else absolute.""" + try: + return str(path.relative_to(REPO_ROOT)) + except ValueError: + return str(path) + + +def run_one( + client, + community_path: Path, + job, + template_path: Path, + out_dir: Path, + dry_run: bool, +) -> dict[str, Any]: + """Submit one task; write results to out_dir; return a stats dict. + + On a successful API call, ``_edison_capture.capture_full_response`` + writes a primary answer .md plus four sibling files + (-response.json, -citations.md, -agent-state.json, -files.json) + for full provenance. See scripts/_edison_capture.py for details. + """ + from edison_client import TaskRequest + + doc = rc.load_community(community_path) + query, variables = render_query(community_path, template_path, doc) + slug = slug_for(community_path) + job_short = _short_job(job) + stem = f"{slug}-edison-{job_short}" + meta_path = out_dir / f"{stem}-meta.yaml" + + def _safe_rel(p: Path) -> str: + try: + return str(p.resolve().relative_to(REPO_ROOT)) + except ValueError: + return str(p) + + base_meta: dict[str, Any] = { + "slug": slug, + "community_path": _safe_rel(community_path), + "community_id": str(doc.get("id") or ""), + "job": job.name, + "job_id": job.value, + "template_path": _safe_rel(template_path), + "template_vars": variables, + "query_chars": len(query), + "query": query, + "submitted_at": datetime.now(timezone.utc).isoformat(), + } + + if dry_run: + # Render the meta yaml even in dry-run so callers can audit + # exactly what would be sent (and compare query_sha256 to detect + # identical re-runs). No .md is written; only meta. + meta = ec.capture_dry_run(out_dir=out_dir, stem=stem, query=query, base_meta=base_meta) + out_dir.mkdir(parents=True, exist_ok=True) + meta_path.write_text( + yaml.safe_dump(meta, sort_keys=False, allow_unicode=True, width=100) + ) + md_path = out_dir / f"{stem}.md" + print(f"[DRY RUN] {_display_path(community_path)} -> {_display_path(md_path)}") + print(f" job={job.name} query_chars={len(query)} meta={_display_path(meta_path)}") + return {"slug": slug, "status": "dry-run", "cost": 0.0} + + out_dir.mkdir(parents=True, exist_ok=True) + task = TaskRequest(name=job, query=query) + print(f" + submitting {slug} ({job.name})...", flush=True) + [response] = client.run_tasks_until_done(task, progress_bar=False) + + meta = ec.capture_full_response( + response=response, + client=client, + out_dir=out_dir, + stem=stem, + query=query, + base_meta=base_meta, + ) + meta_path.write_text(yaml.safe_dump(meta, sort_keys=False, allow_unicode=True, width=100)) + md_path = out_dir / f"{stem}.md" + total_cost = meta.get("total_cost") + print( + f" -> {_display_path(md_path)} cost={total_cost} " + f"citations={meta.get('citations_parsed')} " + f"agent_state={meta.get('sidecar_files', {}).get('agent_state_json', False)}" + ) + return {"slug": slug, "status": meta["status"], "cost": total_cost or 0.0} + + +def load_batch_targets(batch_path: Path) -> list[str]: + """Return a list of target strings from a JSON batch file. + + Accepts either a JSON list of strings (stems/ids/paths) or a list of + objects carrying one of ``target`` / ``slug`` / ``id`` / ``file_path``. + Entries that yield no usable target are skipped. + """ + data = json.loads(batch_path.read_text()) + if not isinstance(data, list): + raise SystemExit(f"--batch expects a JSON list: {batch_path}") + out: list[str] = [] + for entry in data: + if isinstance(entry, str): + out.append(entry) + elif isinstance(entry, dict): + for key in ("target", "slug", "id", "file_path"): + if entry.get(key): + out.append(str(entry[key])) + break + return out + + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser(description=__doc__) + src = ap.add_mutually_exclusive_group(required=True) + src.add_argument("--target", help="Community path, filename stem, or CommunityMech id.") + src.add_argument("--batch", type=Path, help="Path to a JSON list of stems/ids/paths.") + ap.add_argument( + "--job", + default="literature", + help="literature (paperqa3, default) | literature-high | precedent | phoenix", + ) + ap.add_argument("--template", type=Path, default=DEFAULT_TEMPLATE) + ap.add_argument("--out-dir", type=Path, default=DEFAULT_OUT_DIR) + ap.add_argument( + "--limit", type=int, default=None, help="When using --batch, cap the number researched." + ) + ap.add_argument( + "--start", type=int, default=0, help="When using --batch, skip this many entries first." + ) + ap.add_argument( + "--dry-run", action="store_true", help="Render queries + print plan; do NOT call the API." + ) + args = ap.parse_args(argv) + + job = resolve_job(args.job) + + targets: list[Path] + if args.target: + targets = [rc.resolve_community_file(args.target)] + else: + names = load_batch_targets(args.batch)[args.start :] + if args.limit is not None: + names = names[: args.limit] + targets = [] + unresolved: list[str] = [] + for name in names: + try: + targets.append(rc.resolve_community_file(name)) + except FileNotFoundError: + unresolved.append(name) + if unresolved: + print(f"Note: skipped {len(unresolved)} unresolvable batch entries:", file=sys.stderr) + for u in unresolved[:5]: + print(f" - {u}", file=sys.stderr) + if len(unresolved) > 5: + print(f" - ... {len(unresolved) - 5} more", file=sys.stderr) + + if not targets: + print("No targets to research.", file=sys.stderr) + return 2 + + print(f"Edison job: {job.name} ({job.value})") + print(f"Template: {_display_path(args.template.resolve())}") + print(f"Output dir: {_display_path(args.out_dir.resolve())}") + print(f"Communities: {len(targets)}") + if args.dry_run: + print("Mode: DRY RUN (no API calls, no credits spent)") + print() + + client = None + if not args.dry_run: + api_key = load_api_key() + from edison_client import EdisonClient + + client = EdisonClient(api_key=api_key) + + results: list[dict[str, Any]] = [] + try: + for community_path in targets: + results.append( + run_one(client, community_path, job, args.template, args.out_dir, args.dry_run) + ) + finally: + if client is not None: + client.close() + + if not args.dry_run: + total_cost = sum(r["cost"] or 0.0 for r in results) + print() + print(f"Done. {len(results)} communities researched. Total reported cost: {total_cost:.4f}") + return 0 + + +if __name__ == "__main__": + sys.exit(main())