diff --git a/.github/instructions/scripts.instructions.md b/.github/instructions/scripts.instructions.md index 536370c0..5e650a1a 100644 --- a/.github/instructions/scripts.instructions.md +++ b/.github/instructions/scripts.instructions.md @@ -15,3 +15,7 @@ See CLAUDE.md "Development Workflow" for usage. All scripts require the Docker c - Set `PGPKG_LOCAL_REPO_DIR` on the host when you need to force a local pgpkg checkout for `stageversion`, `makemigration`, or related container-script testing - Tagged releases run `.github/workflows/release.yml`, which publishes both `pypgstac` and `pgstac-migrate` to PyPI via the GitHub `pypi` environment; PyPI trusted publishers must exist for both projects - DO NOT run `stageversion` without understanding its side effects +- Benchmark fixture/reporting scripts: + - `scripts/benchmark_fetch_pc_fixtures.py` materializes deterministic Planetary Computer fixtures from `benchmarks/fixtures/planetary-computer/manifest.json` + - `scripts/benchmark_run.py` runs ingest/hydrate/storage benchmarks and writes JSON/CSV/Markdown artifacts + - `scripts/benchmark_compare_results.py` compares two benchmark JSON reports and emits machine-readable deltas diff --git a/.github/workflows/benchmark-compare.yml b/.github/workflows/benchmark-compare.yml new file mode 100644 index 00000000..69a3bc99 --- /dev/null +++ b/.github/workflows/benchmark-compare.yml @@ -0,0 +1,99 @@ +name: Benchmark Compare (manual) + +on: + workflow_dispatch: + inputs: + base_ref: + description: "Base ref to compare against" + required: true + default: "origin/main" + hydrate_iterations: + description: "Hydrate benchmark iterations" + required: true + default: "5" + +permissions: + contents: read + +jobs: + compare: + runs-on: ubuntu-latest + services: + postgres: + image: ghcr.io/stac-utils/pgstac-postgres:main-pg17 + env: + POSTGRES_USER: username + POSTGRES_PASSWORD: password + POSTGRES_DB: postgis + ports: + - 5439:5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 10 + env: + PGHOST: localhost + PGPORT: 5439 + PGUSER: username + PGPASSWORD: password + PGDATABASE: postgis + + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + fetch-depth: 0 + + - uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0 + + - name: Fetch deterministic benchmark fixtures + run: | + uv run --no-project --with psycopg[binary] \ + python scripts/benchmark_fetch_pc_fixtures.py \ + --manifest benchmarks/fixtures/planetary-computer/manifest.json \ + --output-dir /tmp/pgstac-benchmark-fixtures + + - name: Add base worktree + run: | + git fetch --no-tags origin main + git worktree add /tmp/pgstac-benchmark-base "${{ inputs.base_ref }}" + + - name: Benchmark base ref + run: | + uv run --no-project --with psycopg[binary] \ + python scripts/benchmark_run.py \ + --fixtures-dir /tmp/pgstac-benchmark-fixtures \ + --repo-root /tmp/pgstac-benchmark-base \ + --label base \ + --hydrate-iterations "${{ inputs.hydrate_iterations }}" \ + --output-dir /tmp/pgstac-benchmark-results + + - name: Benchmark head ref + run: | + uv run --no-project --with psycopg[binary] \ + python scripts/benchmark_run.py \ + --fixtures-dir /tmp/pgstac-benchmark-fixtures \ + --repo-root "$GITHUB_WORKSPACE" \ + --label head \ + --hydrate-iterations "${{ inputs.hydrate_iterations }}" \ + --output-dir /tmp/pgstac-benchmark-results + + - name: Compare benchmark outputs + run: | + uv run --no-project --with psycopg[binary] \ + python scripts/benchmark_compare_results.py \ + --base /tmp/pgstac-benchmark-results/base.json \ + --head /tmp/pgstac-benchmark-results/head.json \ + --output-dir /tmp/pgstac-benchmark-comparison + + - name: Publish comparison summary + run: cat /tmp/pgstac-benchmark-comparison/comparison.md >> "$GITHUB_STEP_SUMMARY" + + - name: Upload benchmark comparison artifacts + uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3 + with: + name: benchmark-compare-${{ github.sha }} + path: | + /tmp/pgstac-benchmark-fixtures/fixture-summary.json + /tmp/pgstac-benchmark-results/ + /tmp/pgstac-benchmark-comparison/ diff --git a/.github/workflows/benchmark-fixtures.yml b/.github/workflows/benchmark-fixtures.yml new file mode 100644 index 00000000..f1a83f3c --- /dev/null +++ b/.github/workflows/benchmark-fixtures.yml @@ -0,0 +1,73 @@ +name: Benchmark Fixtures + +on: + pull_request: + paths: + - '.github/workflows/benchmark-fixtures.yml' + - 'scripts/benchmark_*' + - 'benchmarks/fixtures/**' + - 'src/pgstac/sql/**' + - 'src/pypgstac/**' + workflow_dispatch: + schedule: + - cron: '17 6 * * 1' + +permissions: + contents: read + +jobs: + benchmark: + runs-on: ubuntu-latest + services: + postgres: + image: ghcr.io/stac-utils/pgstac-postgres:main-pg17 + env: + POSTGRES_USER: username + POSTGRES_PASSWORD: password + POSTGRES_DB: postgis + ports: + - 5439:5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 10 + env: + PGHOST: localhost + PGPORT: 5439 + PGUSER: username + PGPASSWORD: password + PGDATABASE: postgis + + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + + - uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0 + + - name: Fetch deterministic benchmark fixtures + run: | + uv run --no-project --with psycopg[binary] \ + python scripts/benchmark_fetch_pc_fixtures.py \ + --manifest benchmarks/fixtures/planetary-computer/manifest.json \ + --output-dir /tmp/pgstac-benchmark-fixtures + + - name: Run benchmark suite + run: | + uv run --no-project --with psycopg[binary] \ + python scripts/benchmark_run.py \ + --fixtures-dir /tmp/pgstac-benchmark-fixtures \ + --repo-root "$GITHUB_WORKSPACE" \ + --label "$GITHUB_SHA" \ + --hydrate-iterations 5 \ + --output-dir /tmp/pgstac-benchmark-results + + - name: Publish benchmark summary + run: cat /tmp/pgstac-benchmark-results/"$GITHUB_SHA".md >> "$GITHUB_STEP_SUMMARY" + + - name: Upload benchmark artifacts + uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3 + with: + name: benchmark-fixtures-${{ github.sha }} + path: | + /tmp/pgstac-benchmark-fixtures/fixture-summary.json + /tmp/pgstac-benchmark-results/ diff --git a/AGENTS.md b/AGENTS.md index 8e4ee00d..8463a388 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -53,3 +53,18 @@ Specialist in pypgstac bulk loading (`src/pypgstac/src/pypgstac/load.py`). See C - **Retry scope**: `CheckViolation`, `DeadlockDetected`, `SerializationFailure`, `LockNotAvailable`, `ObjectInUse` - **Load modes**: `insert`, `ignore`/`insert_ignore`, `upsert`, `delsert` - Test: `scripts/runinpypgstac --build test --pypgstac` + +--- + +## benchmark-engineer + +Benchmark fixture and reporting specialist for PgSTAC load/hydrate/storage comparisons. + +### Commands + +- Generate deterministic Planetary Computer fixtures (1000 per collection): + `uv run --no-project --with psycopg[binary] python scripts/benchmark_fetch_pc_fixtures.py --manifest benchmarks/fixtures/planetary-computer/manifest.json --output-dir benchmarks/fixtures/planetary-computer/data` +- Run benchmark suite: + `uv run --no-project --with psycopg[binary] python scripts/benchmark_run.py --fixtures-dir benchmarks/fixtures/planetary-computer/data --repo-root "$PWD" --label local --output-dir /tmp/pgstac-benchmark-results` +- Compare benchmark JSON outputs: + `uv run --no-project --with psycopg[binary] python scripts/benchmark_compare_results.py --base --head --output-dir /tmp/pgstac-benchmark-comparison` diff --git a/CHANGELOG.md b/CHANGELOG.md index d62475f4..5f1d5c5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Add deterministic SHA-256 `content_hash` to STAC items to track data changes across migrations. - Add `pgstac_updated_at` column to items table as part of separating STAC property updates from database metadata updates. +- Deterministic Planetary Computer benchmark fixture manifest + fetch tooling for `naip`, `sentinel-2-l2a`, and `landsat-c2-l2` (1000 items per collection), plus CI/manual benchmark workflows that emit JSON/CSV/Markdown artifacts and branch comparison reports. ### Changed diff --git a/CLAUDE.md b/CLAUDE.md index 364618f5..ea57fa30 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -93,6 +93,24 @@ scripts/test --pgdump # pg_dump/pg_restore round-trip test All tests run inside Docker via `scripts/runinpypgstac`. Use `--build` to rebuild images first. +### Benchmark Fixtures and Reporting + +```bash +uv run --no-project --with psycopg[binary] python scripts/benchmark_fetch_pc_fixtures.py \ + --manifest benchmarks/fixtures/planetary-computer/manifest.json \ + --output-dir benchmarks/fixtures/planetary-computer/data + +uv run --no-project --with psycopg[binary] python scripts/benchmark_run.py \ + --fixtures-dir benchmarks/fixtures/planetary-computer/data \ + --repo-root "$PWD" \ + --label local \ + --output-dir /tmp/pgstac-benchmark-results +``` + +GitHub Actions: +- `.github/workflows/benchmark-fixtures.yml` for fixture-based benchmark artifact generation +- `.github/workflows/benchmark-compare.yml` for manual base-vs-head comparison reports + ### Docker Architecture - **pgstac** container: PostgreSQL 17 + PostGIS 3 + extensions, port 5439→5432 diff --git a/benchmarks/fixtures/planetary-computer/.gitignore b/benchmarks/fixtures/planetary-computer/.gitignore new file mode 100644 index 00000000..e95cac97 --- /dev/null +++ b/benchmarks/fixtures/planetary-computer/.gitignore @@ -0,0 +1,2 @@ +data/** +!data/.gitkeep diff --git a/benchmarks/fixtures/planetary-computer/README.md b/benchmarks/fixtures/planetary-computer/README.md new file mode 100644 index 00000000..1c7f117d --- /dev/null +++ b/benchmarks/fixtures/planetary-computer/README.md @@ -0,0 +1,38 @@ +# Planetary Computer benchmark fixtures + +This directory defines reproducible benchmark fixtures for PgSTAC load-path benchmarking. + +## Collections + +- `naip` +- `sentinel-2-l2a` +- `landsat-c2-l2` + +Each fixture set materializes: + +- one collection document (`collection.json`) +- exactly 1000 STAC items (`items.ndjson`) + +## Why fixtures are generated + +Committing 3000 raw Planetary Computer items would add a large and frequently-changing payload to the repository. +Instead, this directory commits a deterministic fixture manifest plus a fetch script. + +## Generate fixtures + +From repository root: + +```bash +uv run --no-project --with psycopg[binary] python scripts/benchmark_fetch_pc_fixtures.py \ + --manifest benchmarks/fixtures/planetary-computer/manifest.json \ + --output-dir benchmarks/fixtures/planetary-computer/data +``` + +## Validate generated fixtures + +```bash +uv run --no-project --with psycopg[binary] python scripts/benchmark_fetch_pc_fixtures.py \ + --manifest benchmarks/fixtures/planetary-computer/manifest.json \ + --output-dir benchmarks/fixtures/planetary-computer/data \ + --validate-only +``` diff --git a/benchmarks/fixtures/planetary-computer/data/.gitkeep b/benchmarks/fixtures/planetary-computer/data/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/benchmarks/fixtures/planetary-computer/manifest.json b/benchmarks/fixtures/planetary-computer/manifest.json new file mode 100644 index 00000000..67b03725 --- /dev/null +++ b/benchmarks/fixtures/planetary-computer/manifest.json @@ -0,0 +1,36 @@ +{ + "api_url": "https://planetarycomputer.microsoft.com/api/stac/v1", + "item_count": 1000, + "collections": [ + { + "id": "naip", + "datetime": "../2025-01-01T00:00:00Z", + "sortby": [ + { + "field": "id", + "direction": "asc" + } + ] + }, + { + "id": "sentinel-2-l2a", + "datetime": "../2025-01-01T00:00:00Z", + "sortby": [ + { + "field": "id", + "direction": "asc" + } + ] + }, + { + "id": "landsat-c2-l2", + "datetime": "../2025-01-01T00:00:00Z", + "sortby": [ + { + "field": "id", + "direction": "asc" + } + ] + } + ] +} diff --git a/scripts/benchmark_compare_results.py b/scripts/benchmark_compare_results.py new file mode 100755 index 00000000..a7ee64c7 --- /dev/null +++ b/scripts/benchmark_compare_results.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +"""Compare two benchmark JSON reports and emit markdown/CSV deltas.""" + +from __future__ import annotations + +import argparse +import csv +import json +from pathlib import Path +from typing import Any + +TRACKED_METRICS = ( + ("ingest", "ms_per_item"), + ("hydrate", "ms_per_item"), + ("storage", "bytes_per_item"), +) + + +def load_report(path: Path) -> dict[str, Any]: + with path.open("r", encoding="utf-8") as src: + return json.load(src) + + +def index_by_collection(report: dict[str, Any]) -> dict[str, dict[str, Any]]: + return {entry["collection_id"]: entry for entry in report.get("collections", [])} + + +def compare(base: dict[str, Any], head: dict[str, Any]) -> list[dict[str, Any]]: + base_by_collection = index_by_collection(base) + head_by_collection = index_by_collection(head) + + rows: list[dict[str, Any]] = [] + for collection_id in sorted(set(base_by_collection) & set(head_by_collection)): + base_entry = base_by_collection[collection_id] + head_entry = head_by_collection[collection_id] + for section, metric in TRACKED_METRICS: + base_raw = base_entry[section][metric] + head_raw = head_entry[section][metric] + if base_raw is None or head_raw is None: + continue + base_value = float(base_raw) + head_value = float(head_raw) + delta = head_value - base_value + delta_pct = (delta / base_value * 100.0) if base_value else None + rows.append( + { + "collection": collection_id, + "metric": f"{section}.{metric}", + "base": base_value, + "head": head_value, + "delta": delta, + "delta_pct": delta_pct, + }, + ) + + rows.append( + { + "collection": "_all", + "metric": "global.item_fragments_total_bytes", + "base": float(base["global_storage"].get("item_fragments_total_bytes", 0)), + "head": float(head["global_storage"].get("item_fragments_total_bytes", 0)), + "delta": float(head["global_storage"].get("item_fragments_total_bytes", 0)) + - float(base["global_storage"].get("item_fragments_total_bytes", 0)), + "delta_pct": None, + }, + ) + return rows + + +def write_csv(rows: list[dict[str, Any]], path: Path) -> None: + fieldnames = ["collection", "metric", "base", "head", "delta", "delta_pct"] + with path.open("w", newline="", encoding="utf-8") as dst: + writer = csv.DictWriter(dst, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + + +def write_markdown( + rows: list[dict[str, Any]], + base_label: str, + head_label: str, + output_path: Path, +) -> None: + lines = [ + f"# Benchmark comparison: {base_label} → {head_label}", + "", + "| Collection | Metric | Base | Head | Delta | Delta % |", + "|---|---|---:|---:|---:|---:|", + ] + + for row in rows: + delta_pct = "n/a" if row["delta_pct"] is None else f"{row['delta_pct']:.2f}%" + lines.append( + f"| {row['collection']} | {row['metric']} | {row['base']:.6f} | " + f"{row['head']:.6f} | {row['delta']:.6f} | {delta_pct} |", + ) + + output_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--base", type=Path, required=True) + parser.add_argument("--head", type=Path, required=True) + parser.add_argument("--output-dir", type=Path, required=True) + args = parser.parse_args() + + args.output_dir.mkdir(parents=True, exist_ok=True) + + base_report = load_report(args.base) + head_report = load_report(args.head) + rows = compare(base_report, head_report) + + csv_path = args.output_dir / "comparison.csv" + json_path = args.output_dir / "comparison.json" + md_path = args.output_dir / "comparison.md" + + write_csv(rows, csv_path) + json_path.write_text(json.dumps(rows, indent=2, sort_keys=True) + "\n", encoding="utf-8") + write_markdown(rows, base_report["label"], head_report["label"], md_path) + print(json.dumps(rows, indent=2, sort_keys=True)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/benchmark_fetch_pc_fixtures.py b/scripts/benchmark_fetch_pc_fixtures.py new file mode 100755 index 00000000..c9ad0685 --- /dev/null +++ b/scripts/benchmark_fetch_pc_fixtures.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python3 +"""Fetch deterministic Planetary Computer fixture snapshots for benchmarking.""" + +from __future__ import annotations + +import argparse +import hashlib +import json +import sys +from pathlib import Path +from typing import Any +from urllib.error import HTTPError, URLError +from urllib.parse import urljoin +from urllib.request import Request, urlopen + + +def request_json(url: str, method: str = "GET", body: dict[str, Any] | None = None) -> dict[str, Any]: + payload = None + headers = {"Accept": "application/json"} + if body is not None: + payload = json.dumps(body).encode("utf-8") + headers["Content-Type"] = "application/json" + + req = Request(url=url, data=payload, method=method, headers=headers) + try: + with urlopen(req, timeout=120) as response: + return json.load(response) + except (HTTPError, URLError, TimeoutError) as exc: + raise RuntimeError(f"Request failed for {method} {url}: {exc}") from exc + + +def hash_file(path: Path) -> str: + sha = hashlib.sha256() + with path.open("rb") as src: + for chunk in iter(lambda: src.read(65536), b""): + sha.update(chunk) + return sha.hexdigest() + + +def find_next_link(page: dict[str, Any]) -> dict[str, Any] | None: + for link in page.get("links", []): + if link.get("rel") == "next": + return link + return None + + +def normalize_item(item: dict[str, Any], collection_id: str) -> dict[str, Any]: + if item.get("collection") != collection_id: + raise ValueError( + f"Item {item.get('id')} does not belong to {collection_id}: {item.get('collection')}", + ) + return item + + +def fetch_collection( + api_url: str, + collection_cfg: dict[str, Any], + item_count: int, +) -> tuple[dict[str, Any], list[dict[str, Any]]]: + collection_id = collection_cfg["id"] + collection_url = urljoin(f"{api_url.rstrip('/')}/", f"collections/{collection_id}") + collection_doc = request_json(collection_url) + + search_url = urljoin(f"{api_url.rstrip('/')}/", "search") + base_body: dict[str, Any] = { + "collections": [collection_id], + "limit": min(250, item_count), + "sortby": collection_cfg.get( + "sortby", + [{"field": "id", "direction": "asc"}], + ), + } + if "datetime" in collection_cfg: + base_body["datetime"] = collection_cfg["datetime"] + + page = request_json(search_url, method="POST", body=base_body) + features: list[dict[str, Any]] = [] + + while True: + page_features = page.get("features", []) + features.extend(normalize_item(item, collection_id) for item in page_features) + if len(features) >= item_count: + break + + next_link = find_next_link(page) + if not next_link: + break + + href = next_link.get("href") + if href is None: + break + + method = str(next_link.get("method", "GET")).upper() + if method == "POST": + body = next_link.get("body") + if not isinstance(body, dict): + body = base_body + page = request_json(href, method="POST", body=body) + else: + page = request_json(href) + + if len(features) < item_count: + raise RuntimeError( + f"Only fetched {len(features)} items for {collection_id}; expected {item_count}", + ) + + features = features[:item_count] + ids = [item.get("id") for item in features] + if len(ids) != len(set(ids)): + raise RuntimeError(f"Duplicate item ids detected for {collection_id}") + + return collection_doc, features + + +def validate_fixture(output_dir: Path, collection_id: str, item_count: int) -> None: + collection_path = output_dir / collection_id / "collection.json" + items_path = output_dir / collection_id / "items.ndjson" + + if not collection_path.exists() or not items_path.exists(): + raise FileNotFoundError(f"Missing fixture files for {collection_id} in {output_dir}") + + with collection_path.open("r", encoding="utf-8") as src: + collection_doc = json.load(src) + if collection_doc.get("id") != collection_id: + raise ValueError( + f"collection.json id mismatch for {collection_id}: {collection_doc.get('id')}", + ) + + seen: set[str] = set() + count = 0 + with items_path.open("r", encoding="utf-8") as src: + for line in src: + line = line.strip() + if not line: + continue + item = json.loads(line) + if item.get("collection") != collection_id: + raise ValueError( + f"Item {item.get('id')} has wrong collection {item.get('collection')}", + ) + item_id = item.get("id") + if not isinstance(item_id, str): + raise ValueError("Item id must be a string") + if item_id in seen: + raise ValueError(f"Duplicate item id in fixture for {collection_id}: {item_id}") + seen.add(item_id) + count += 1 + + if count != item_count: + raise ValueError( + f"Fixture for {collection_id} has {count} items, expected {item_count}", + ) + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--manifest", + type=Path, + default=Path("benchmarks/fixtures/planetary-computer/manifest.json"), + ) + parser.add_argument( + "--output-dir", + type=Path, + default=Path("benchmarks/fixtures/planetary-computer/data"), + ) + parser.add_argument("--overwrite", action="store_true") + parser.add_argument("--validate-only", action="store_true") + args = parser.parse_args() + + with args.manifest.open("r", encoding="utf-8") as src: + manifest = json.load(src) + + api_url = manifest["api_url"] + item_count = int(manifest["item_count"]) + collections = manifest["collections"] + + args.output_dir.mkdir(parents=True, exist_ok=True) + + if args.validate_only: + for collection_cfg in collections: + validate_fixture(args.output_dir, collection_cfg["id"], item_count) + print("Fixture validation successful.") + return 0 + + summary: dict[str, Any] = { + "api_url": api_url, + "item_count": item_count, + "collections": {}, + } + + for collection_cfg in collections: + collection_id = collection_cfg["id"] + target_dir = args.output_dir / collection_id + target_dir.mkdir(parents=True, exist_ok=True) + + collection_path = target_dir / "collection.json" + items_path = target_dir / "items.ndjson" + + if not args.overwrite and collection_path.exists() and items_path.exists(): + validate_fixture(args.output_dir, collection_id, item_count) + summary["collections"][collection_id] = { + "collection_sha256": hash_file(collection_path), + "items_sha256": hash_file(items_path), + "reused_existing": True, + } + continue + + print(f"Fetching {collection_id} from {api_url} ...", file=sys.stderr) + collection_doc, features = fetch_collection(api_url, collection_cfg, item_count) + + with collection_path.open("w", encoding="utf-8") as dst: + json.dump(collection_doc, dst, indent=2, sort_keys=True) + dst.write("\n") + + with items_path.open("w", encoding="utf-8") as dst: + for item in features: + dst.write(json.dumps(item, separators=(",", ":"), sort_keys=True)) + dst.write("\n") + + validate_fixture(args.output_dir, collection_id, item_count) + summary["collections"][collection_id] = { + "collection_sha256": hash_file(collection_path), + "items_sha256": hash_file(items_path), + "reused_existing": False, + } + + summary_path = args.output_dir / "fixture-summary.json" + with summary_path.open("w", encoding="utf-8") as dst: + json.dump(summary, dst, indent=2, sort_keys=True) + dst.write("\n") + + print(json.dumps(summary, indent=2, sort_keys=True)) + return 0 + + +if __name__ == "__main__": + try: + raise SystemExit(main()) + except Exception as exc: + print(f"Fixture fetch failed: {exc}", file=sys.stderr) + raise SystemExit(2) diff --git a/scripts/benchmark_run.py b/scripts/benchmark_run.py new file mode 100755 index 00000000..44131cdd --- /dev/null +++ b/scripts/benchmark_run.py @@ -0,0 +1,460 @@ +#!/usr/bin/env python3 +"""Run PgSTAC ingest/hydrate/storage benchmarks from fixture files.""" + +from __future__ import annotations + +import argparse +import csv +import json +import os +import subprocess +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import psycopg + + +@dataclass +class FixtureCollection: + collection_id: str + collection_path: Path + items_path: Path + + +def run(cmd: list[str], cwd: Path) -> None: + subprocess.run(cmd, cwd=str(cwd), check=True) + + +def get_dsn() -> str: + dsn = os.getenv("PG_DSN") + if dsn: + return dsn + if any(os.getenv(name) for name in ("PGHOST", "PGDATABASE", "PGUSER", "PGPORT")): + return "" + raise RuntimeError( + "Database connection is not configured. Set PG_DSN or PGHOST/PGDATABASE/PGUSER/PGPORT.", + ) + + +def reset_schema(repo_root: Path, pypgstac_dir: Path) -> None: + dsn = get_dsn() + with psycopg.connect(dsn, autocommit=True) as conn: + conn.execute("DROP SCHEMA IF EXISTS pgstac CASCADE;") + if not pypgstac_dir.exists(): + raise RuntimeError(f"pypgstac directory does not exist: {pypgstac_dir}") + run(["uv", "run", "--directory", str(pypgstac_dir), "pypgstac", "migrate"], cwd=repo_root) + + +def discover_fixture_collections(fixtures_dir: Path) -> list[FixtureCollection]: + collections: list[FixtureCollection] = [] + for collection_dir in sorted(fixtures_dir.iterdir()): + if not collection_dir.is_dir(): + continue + collection_path = collection_dir / "collection.json" + items_path = collection_dir / "items.ndjson" + if not collection_path.exists() or not items_path.exists(): + continue + collections.append( + FixtureCollection( + collection_id=collection_dir.name, + collection_path=collection_path, + items_path=items_path, + ), + ) + if not collections: + raise RuntimeError(f"No fixture collections found in {fixtures_dir}") + return collections + + +def insert_collection(cur: psycopg.Cursor[Any], collection_path: Path) -> None: + content = collection_path.read_text(encoding="utf-8") + cur.execute("INSERT INTO collections (content) VALUES (%s::jsonb);", (content,)) + + +def ingest_collection(cur: psycopg.Cursor[Any], fixture: FixtureCollection) -> tuple[int, float]: + started = time.perf_counter() + rows = 0 + with cur.copy("COPY items_staging (content) FROM stdin") as copy: + with fixture.items_path.open("r", encoding="utf-8") as src: + for line in src: + line = line.strip() + if not line: + continue + copy.write_row((line,)) + rows += 1 + duration_ms = (time.perf_counter() - started) * 1000.0 + return rows, duration_ms + + +def hydrate_collection(cur: psycopg.Cursor[Any], collection_id: str, iterations: int) -> tuple[int, float]: + total_ms = 0.0 + rows = 0 + for _ in range(iterations): + started = time.perf_counter() + cur.execute( + """ + SELECT count(*) + FROM ( + SELECT content_hydrate(i) + FROM items i + WHERE i.collection = %s + ) hydrated + """, + (collection_id,), + ) + rows = int(cur.fetchone()[0]) + total_ms += (time.perf_counter() - started) * 1000.0 + return rows, total_ms / iterations + + +def get_partition_key(cur: psycopg.Cursor[Any], collection_id: str) -> str: + cur.execute("SELECT key FROM collections WHERE id = %s", (collection_id,)) + row = cur.fetchone() + if not row or not row[0]: + raise RuntimeError(f"Unable to find partition key for {collection_id}") + return str(row[0]) + + +def get_collection_storage(cur: psycopg.Cursor[Any], partition_key: str) -> dict[str, int]: + cur.execute( + """ + SELECT + COALESCE(SUM(pg_relation_size(c.oid)), 0) AS table_bytes, + COALESCE(SUM(pg_indexes_size(c.oid)), 0) AS index_bytes, + COALESCE(SUM(pg_total_relation_size(c.oid)), 0) AS total_bytes + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'pgstac' + AND c.relkind = 'r' + AND c.relname LIKE %s + """, + (f"_items_{partition_key}%",), + ) + row = cur.fetchone() + return { + "table_bytes": int(row[0]), + "index_bytes": int(row[1]), + "total_bytes": int(row[2]), + } + + +def get_global_storage(cur: psycopg.Cursor[Any]) -> dict[str, int]: + cur.execute( + """ + SELECT COALESCE(SUM(pg_total_relation_size(c.oid)), 0) + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'pgstac' + AND c.relkind IN ('r', 'm') + """, + ) + schema_total = int(cur.fetchone()[0]) + + cur.execute("SELECT to_regclass('pgstac.item_fragments')") + has_fragments = cur.fetchone()[0] is not None + fragment_bytes = 0 + if has_fragments: + cur.execute("SELECT pg_total_relation_size('pgstac.item_fragments'::regclass)") + fragment_bytes = int(cur.fetchone()[0]) + + return { + "schema_total_bytes": schema_total, + "item_fragments_total_bytes": fragment_bytes, + } + + +def per_item(value: float, rows: int) -> float | None: + if rows <= 0: + return None + return value / rows + + +def flatten_metrics(report: dict[str, Any]) -> list[dict[str, Any]]: + rows: list[dict[str, Any]] = [] + label = report["label"] + + global_storage = report["global_storage"] + rows.append( + { + "label": label, + "collection": "_all", + "suite": "storage", + "metric": "schema_total_bytes", + "value": global_storage["schema_total_bytes"], + "unit": "bytes", + }, + ) + rows.append( + { + "label": label, + "collection": "_all", + "suite": "storage", + "metric": "item_fragments_total_bytes", + "value": global_storage["item_fragments_total_bytes"], + "unit": "bytes", + }, + ) + + for result in report["collections"]: + collection_id = result["collection_id"] + rows.extend( + [ + { + "label": label, + "collection": collection_id, + "suite": "ingest", + "metric": "rows", + "value": result["ingest"]["rows"], + "unit": "count", + }, + { + "label": label, + "collection": collection_id, + "suite": "ingest", + "metric": "ingest_total_ms", + "value": result["ingest"]["duration_ms"], + "unit": "ms", + }, + { + "label": label, + "collection": collection_id, + "suite": "ingest", + "metric": "ingest_ms_per_item", + "value": result["ingest"]["ms_per_item"], + "unit": "ms_per_item", + }, + { + "label": label, + "collection": collection_id, + "suite": "hydrate", + "metric": "hydrate_rows", + "value": result["hydrate"]["rows"], + "unit": "count", + }, + { + "label": label, + "collection": collection_id, + "suite": "hydrate", + "metric": "hydrate_avg_ms", + "value": result["hydrate"]["avg_ms"], + "unit": "ms", + }, + { + "label": label, + "collection": collection_id, + "suite": "hydrate", + "metric": "hydrate_ms_per_item", + "value": result["hydrate"]["ms_per_item"], + "unit": "ms_per_item", + }, + { + "label": label, + "collection": collection_id, + "suite": "storage", + "metric": "collection_table_bytes", + "value": result["storage"]["table_bytes"], + "unit": "bytes", + }, + { + "label": label, + "collection": collection_id, + "suite": "storage", + "metric": "collection_index_bytes", + "value": result["storage"]["index_bytes"], + "unit": "bytes", + }, + { + "label": label, + "collection": collection_id, + "suite": "storage", + "metric": "collection_total_bytes", + "value": result["storage"]["total_bytes"], + "unit": "bytes", + }, + { + "label": label, + "collection": collection_id, + "suite": "storage", + "metric": "collection_bytes_per_item", + "value": result["storage"]["bytes_per_item"], + "unit": "bytes_per_item", + }, + ], + ) + return rows + + +def write_csv(rows: list[dict[str, Any]], path: Path) -> None: + fieldnames = ["label", "collection", "suite", "metric", "value", "unit"] + with path.open("w", newline="", encoding="utf-8") as dst: + writer = csv.DictWriter(dst, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + + +def write_markdown(report: dict[str, Any], path: Path) -> None: + lines = [ + f"# Benchmark results ({report['label']})", + "", + f"- Fixture directory: `{report['fixtures_dir']}`", + f"- Repo root: `{report['repo_root']}`", + f"- Hydrate iterations: `{report['hydrate_iterations']}`", + "", + "## Collection summary", + "", + "| Collection | Rows | Ingest ms/item | Hydrate ms/item | Storage bytes/item | Total bytes |", + "|---|---:|---:|---:|---:|---:|", + ] + + for result in report["collections"]: + ingest_ms_per_item = result["ingest"]["ms_per_item"] + hydrate_ms_per_item = result["hydrate"]["ms_per_item"] + storage_bytes_per_item = result["storage"]["bytes_per_item"] + lines.append( + "| {collection_id} | {rows} | {ingest} | {hydrate} | {bytes_per_item} | {total_bytes} |".format( + collection_id=result["collection_id"], + rows=result["ingest"]["rows"], + ingest=f"{ingest_ms_per_item:.3f}" if ingest_ms_per_item is not None else "n/a", + hydrate=f"{hydrate_ms_per_item:.6f}" + if hydrate_ms_per_item is not None + else "n/a", + bytes_per_item=f"{storage_bytes_per_item:.2f}" + if storage_bytes_per_item is not None + else "n/a", + total_bytes=result["storage"]["total_bytes"], + ), + ) + + lines.extend( + [ + "", + "## Global storage", + "", + f"- `schema_total_bytes`: {report['global_storage']['schema_total_bytes']}", + f"- `item_fragments_total_bytes`: {report['global_storage']['item_fragments_total_bytes']}", + "", + ], + ) + + path.write_text("\n".join(lines), encoding="utf-8") + + +def run_benchmark( + fixtures_dir: Path, + repo_root: Path, + pypgstac_dir: Path, + label: str, + hydrate_iterations: int, +) -> dict[str, Any]: + reset_schema(repo_root, pypgstac_dir) + + collections = discover_fixture_collections(fixtures_dir) + + report: dict[str, Any] = { + "label": label, + "repo_root": str(repo_root), + "fixtures_dir": str(fixtures_dir), + "hydrate_iterations": hydrate_iterations, + "collections": [], + } + + dsn = get_dsn() + with psycopg.connect(dsn, autocommit=True) as conn: + with conn.cursor() as cur: + cur.execute("SET search_path TO pgstac, public;") + + for fixture in collections: + insert_collection(cur, fixture.collection_path) + + for fixture in collections: + rows, ingest_duration_ms = ingest_collection(cur, fixture) + hydrate_rows, hydrate_avg_ms = hydrate_collection( + cur, + fixture.collection_id, + hydrate_iterations, + ) + partition_key = get_partition_key(cur, fixture.collection_id) + storage = get_collection_storage(cur, partition_key) + + report["collections"].append( + { + "collection_id": fixture.collection_id, + "ingest": { + "rows": rows, + "duration_ms": ingest_duration_ms, + "ms_per_item": per_item(ingest_duration_ms, rows), + }, + "hydrate": { + "rows": hydrate_rows, + "avg_ms": hydrate_avg_ms, + "ms_per_item": per_item(hydrate_avg_ms, hydrate_rows), + }, + "storage": { + **storage, + "bytes_per_item": per_item(float(storage["total_bytes"]), rows), + }, + }, + ) + + report["global_storage"] = get_global_storage(cur) + + return report + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--fixtures-dir", + type=Path, + required=True, + help="Directory containing per-collection fixture folders.", + ) + parser.add_argument( + "--repo-root", + type=Path, + required=True, + help="Repository root whose migration stack should be benchmarked.", + ) + parser.add_argument( + "--pypgstac-dir", + type=Path, + default=Path("src/pypgstac"), + help="Path to the pypgstac project (relative to --repo-root if not absolute).", + ) + parser.add_argument("--label", type=str, required=True) + parser.add_argument("--hydrate-iterations", type=int, default=5) + parser.add_argument("--output-dir", type=Path, required=True) + args = parser.parse_args() + + args.output_dir.mkdir(parents=True, exist_ok=True) + + pypgstac_dir = args.pypgstac_dir + if not pypgstac_dir.is_absolute(): + pypgstac_dir = args.repo_root / pypgstac_dir + + report = run_benchmark( + fixtures_dir=args.fixtures_dir, + repo_root=args.repo_root, + pypgstac_dir=pypgstac_dir, + label=args.label, + hydrate_iterations=args.hydrate_iterations, + ) + + json_path = args.output_dir / f"{args.label}.json" + csv_path = args.output_dir / f"{args.label}.csv" + md_path = args.output_dir / f"{args.label}.md" + + rows = flatten_metrics(report) + + json_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8") + write_csv(rows, csv_path) + write_markdown(report, md_path) + + print(json.dumps(report, indent=2, sort_keys=True)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())