diff --git a/.github/workflows/climate-update.yml b/.github/workflows/climate-update.yml index abc26c0..6e2dd25 100644 --- a/.github/workflows/climate-update.yml +++ b/.github/workflows/climate-update.yml @@ -11,7 +11,7 @@ jobs: timeout-minutes: 360 # 6 hours max env: - CDS_API_KEY: ${{ secrets.CDS_API_KEY }} + EDH_TOKEN: ${{ secrets.EDH_TOKEN }} AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} AWS_DEFAULT_REGION: us-west-2 @@ -24,26 +24,34 @@ jobs: use-public-rspm: true - uses: r-lib/actions/setup-r-dependencies@v2 - with: - extra-packages: any::ecmwfr - - name: Configure CDS API - run: | - echo "url: https://cds.climate.copernicus.eu/api" > ~/.cdsapirc - echo "key: $CDS_API_KEY" >> ~/.cdsapirc - chmod 600 ~/.cdsapirc + - name: Install uv (runs the Python backfill) + uses: astral-sh/setup-uv@v5 - - name: Set ecmwfr key + - name: Verify required secrets present run: | - Rscript -e 'ecmwfr::wf_set_key(key = Sys.getenv("CDS_API_KEY"))' - - - name: Run update pipeline + missing=() + [ -z "$EDH_TOKEN" ] && missing+=("EDH_TOKEN") + [ -z "$AWS_ACCESS_KEY_ID" ] && missing+=("AWS_ACCESS_KEY_ID") + [ -z "$AWS_SECRET_ACCESS_KEY" ] && missing+=("AWS_SECRET_ACCESS_KEY") + if [ ${#missing[@]} -gt 0 ]; then + echo "ERROR: missing secrets: ${missing[*]}" + exit 1 + fi + + - name: Run EDH update pipeline + # pipefail so Rscript's exit code propagates through `tee`; default + # bash -e alone would let tee swallow a non-zero R exit. + shell: bash run: | + set -o pipefail mkdir -p logs - Rscript scripts/pipeline_update.R 2>&1 | tee logs/update_$(date +%Y%m%d).log + Rscript scripts/pipeline_update_edh.R 2>&1 | tee logs/update_$(date +%Y%m%d).log + # Only commit logs when running on main. workflow_dispatch from a + # feature branch should not rebase onto or push to main. - name: Commit log - if: always() + if: always() && github.ref == 'refs/heads/main' run: | git config user.name "github-actions[bot]" git config user.email "github-actions[bot]@users.noreply.github.com" diff --git a/planning/active/findings.md b/planning/active/findings.md new file mode 100644 index 0000000..d5eed32 --- /dev/null +++ b/planning/active/findings.md @@ -0,0 +1,108 @@ +# Findings: EDH migration + +## EDH API basics (from #35 research and tonight's benchmark) + +**Zarr URL:** `https://data.earthdatahub.destine.eu/era5/reanalysis-era5-land-no-antartica-v0.zarr` + +**Auth:** Personal access token, passed inline as URL password: +``` +https://edh:@data.earthdatahub.destine.eu/... +``` +Alternative: `storage_options={"client_kwargs":{"trust_env":True}}` to read from `.netrc`. + +**Quota:** 500,000 requests/month per user (confirmed in EDH getting-started docs). + +**Coord conventions (from live Zarr inspection):** +- Dimensions: `valid_time`, `latitude`, `longitude` +- `latitude`: descending from ~90 to -60 (confirmed by working slice `slice(60, 48)`) +- `longitude`: **0 to 359.9** (0-360 convention, NOT -180 to 180) +- For BC bbox `c(60, -140, 48, -114)` → lon slice must be `220` to `246` +- `valid_time`: hourly, from 1950-01-01T00:00 to current month last day +- Variable names: CF-style short names (`t2m`, `tp`, `d2m`, `swvl1-4`, `u10`, `v10`, etc.) + +**50 variables available in single Zarr store:** +`asn, d2m, e, es, evabs, evaow, evatc, evavt, fal, lai_hv, lai_lv, lblt, licd, lict, lmld, lmlt, lshf, ltlt, pev, ro, rsn, sd, sde, sf, skt, slhf, smlt, snowc, sp, src, sro, sshf, ssr, ssrd, ssro, stl1, stl2, stl3, stl4, str, strd, swvl1, swvl2, swvl3, swvl4, t2m, tp, tsn, u10, v10` + +**Relevant variable mapping for cd package:** +| cd variable | EDH variable(s) | Units | Notes | +|---|---|---|---| +| tmean | `t2m` | K | Hourly; monthly mean computed client-side | +| tmax | `t2m` | K | Hourly; daily max → monthly mean of daily max | +| tmin | `t2m` | K | Hourly; daily min → monthly mean of daily min | +| prcp | `tp` | m | Hourly total precipitation; sum over month, × 1000 for mm | +| dewpoint | `d2m` | K | Hourly 2m dewpoint | +| soil_moisture | `swvl1`, `swvl2`, `swvl3`, `swvl4` | m³/m³ | 4 vertical layers | +| vpd | derived | hPa | From t2m + d2m (Tetens) | +| rh | derived | % | From t2m + d2m | + +## Performance (single month BC bbox) + +- Open Zarr store: 2.6-3.0s (metadata only) +- Subset + materialize 1 month × BC bbox × hourly t2m: 14.9-15.9s +- In-memory size: 92.9 MB for 744 hours × 120 lat × 260 lon float64 + +## Implications for pipeline + +- No rate limits means we can pull **all months in parallel** via asyncio/dask if wanted +- Zarr chunking is independent of month boundaries, so small time-slice pulls are efficient +- Can pull all 7 cd variables from the **same dataset open** — no need for separate fetches per variable + +## R/Zarr tooling options + +- **Reticulate + Python xarray** — proven; what `test_edh_era5_land.py` uses. Simple, works. +- **stars::read_mdim()** — GDAL zarr driver. Requires GDAL built with zarr, auth via URL password likely works. +- **pizzarr** — young R-native zarr package, may not support remote HTTPS stores. + +Decision deferred to Phase 3 after Phase 2 unblocks the data. + +## Known limitation: UTC-day aggregation for tmax/tmin + +Both the existing R pipeline (`pipeline_tmax_tmin_hourly.R`) and the new EDH +Python script compute daily max/min over UTC-day windows, not local-time days. +For BC (UTC-7/-8) the local-afternoon tmax peak straddles UTC day boundaries, +introducing a small systematic bias vs. a local-time daily aggregation. + +- CDS's `derived-era5-land-daily-statistics` product accepts a `time_zone` + parameter and would fix this — but we abandoned it in #33 due to its + ~2-hour-per-job queue time. +- Fixing properly requires shifting hourly timestamps by the local UTC offset + before `.resample("1D")`, or doing per-pixel local-time aggregation + (computationally expensive and overkill for a regional dataset). +- For now: the EDH script replicates the R pipeline's existing behavior so + outputs are directly comparable. Addressing this bias is a follow-up issue + (cd package methodology). + +## Precipitation accumulation (ERA5-Land tp) + +The hourly `tp` variable has `GRIB_stepType=accum` — it is a **running +accumulation** from 01:00 UTC reset to 00:00 UTC next day. Naive +`.sum()` over hourly values gives a monthly precip ~8× too high. + +**Solution:** use EDH's daily product +`https://data.earthdatahub.destine.eu/era5/era5-land-daily-utc-v1.zarr` +which pre-computes daily totals correctly. Monthly precip = sum of +daily totals. + +**Why not fix the hourly sum:** writing accumulation logic ourselves +is bug-prone. The daily product is what ECMWF designed for this +exact case. + +**Caveat:** daily product only has 14 variables (t2m, d2m, swvl1-2, tp, +wind, radiation, etc.) — no swvl3 or swvl4. So soil moisture stays on +the hourly product. tmean and dewpoint could use either; we use hourly +for consistency and because the 0.99 correlation (vs CDS) was already +validated. + +## EDH product catalogue (what's available for cd package) + +| Product | Zarr URL | Variables | Use | +|---|---|---|---| +| Hourly ERA5-Land | `reanalysis-era5-land-no-antartica-v0.zarr` | 50 | tmax/tmin, tmean, dewpoint, soil_moisture | +| Daily ERA5-Land (UTC) | `era5-land-daily-utc-v1.zarr` | 14 | prcp only | +| Monthly ERA5 (not Land) | — | - | NOT useful (ERA5 parent, 31 km) | + +## Out-of-scope confirmations + +- GEE has ERA5-Land but commercial license blocks us +- AWS / ARCO / WeatherBench all serve ERA5 (31 km), not ERA5-Land +- CDS-beta is same backend, same rate limits diff --git a/planning/active/progress.md b/planning/active/progress.md new file mode 100644 index 0000000..5d0c20e --- /dev/null +++ b/planning/active/progress.md @@ -0,0 +1,100 @@ +# Progress: EDH migration + +## Session 2026-04-12 (evening) + +**Context from prior day:** Got rate-limited twice by CDS. Researched alternatives (#35), benchmarked EDH Zarr at 5× faster with no rate limits and same data. Filed #36 for migration. Pivoting now. + +**Completed:** +- Branched off main: `36-edh-migration` +- Stashed and restored `scripts/test_edh_era5_land.py` on new branch +- Set up PWF files (this document, task_plan.md, findings.md) + +**Next:** +- Update climate-update.yml GHA to use EDH (replace CDS fetcher with + scripts/backfill_edh_all.py + stage 3) +- Write pipeline_update_edh (EDH incremental update, replaces + CDS-based pipeline_update.R) +- Land the PR + +## Session 2026-04-12 (Stage 3 to S3) + +**Completed:** +- Wrote `scripts/pipeline_stage3_edh.R` — slim Stage-3-only against + `data/backfill/monthly/` (avoids CDS fetch in existing pipeline_backfill.R) +- Built 35 COGs locally: 7 variables × (annual + 4 seasons) × 76 years +- STAC catalog built, 35 items +- S3 sync pushed all 35 COGs to s3://stac-era5-land (178 MB total, + replacing the April 6-7 CDS-era COGs) +- Catalog.json unchanged (byte-identical) because filenames + extents + + metadata didn't change — legitimate sync behavior, not a bug +- Verified end-to-end via /vsicurl: tmean_annual reads back with correct + 76 years, CRS, extent. BC annual mean 1950 = -1.42 C, 2024 = +1.85 C + (3.3 C warming in 74 years — the climate departure signal the package + is built to show, now live on S3 from EDH data) + +## Session 2026-04-12 (unified backfill run) + +**Completed:** +- Full 1950-2025 unified backfill of 5 non-tmax/tmin variables via EDH in ~4 hours + (10:19 - 14:20 wall time) +- 532 monthly TIFs total (7 vars × 76 years), all on matching EPSG:4326 120×260 grid +- CDS-era files safely preserved in `data/backfill/monthly/_cds_backup/` (375 files) +- Two transient errors handled well: + - 1989: TimeoutError → retry-with-backoff caught it, completed on attempt 2 + - 2008: ClientPayloadError → retry catch didn't include aiohttp class; outer handler + skipped the year and continued. Filled later via `--year 2008` rerun. +- QA confirmed: all grids aligned, tmin<=tmean<=tmax with zero violations, + physically plausible values, soil_moisture composite correct. + +## Session 2026-04-12 (overnight run) + +**Completed:** +- Full 1950-2025 backfill via EDH, ~1h 53min unattended (02:16 - 04:09) + - 75 year-files written (1951-2025) plus 1950 from earlier test = 76 years + - Avg ~90s per year, no failures +- Regenerated 2024 from EDH (was a CDS leftover from April 7 with different methodology) +- Filed #37 for the UTC-day aggregation bias (not blocking #36) +- All 76 years × 2 variables × 12 months × BC bbox GeoTIFFs in data/backfill/monthly/ + +**Validation (R/terra spot check):** +- 1950 Jul tmax up to 29.7 °C — historically realistic +- 2000 Jul tmax up to 28.9 °C +- 2024 Jul tmax up to 34.6 °C — matches heat dome era +- 2025 Jul tmax up to 31.9 °C +- All years: 12 layers, Jan..Dec names, EPSG:4326, BC bbox extent + +## Session 2026-04-12 (morning QA) + +**Completed:** +- Wrote `scripts/qa_monthly.R` — checks grid alignment, time coverage, + physical sanity across all variables +- Wrote `scripts/probe_edh_vars.py` — pulls one month of each EDH + variable, compares to CDS, validates aggregation semantics +- **Discovered grid mismatch:** CDS-era vars on 121×261 grid with no CRS + tag, vs EDH-era tmax/tmin on 120×260 EPSG:4326. Blocks release. +- **Validated EDH semantics:** + - tmean (hourly t2m mean): 0.99 correlation with CDS, 0.43 °C mean abs diff + - dewpoint: plausible BC winter values + - soil_moisture: all 4 depths match CDS composite closely + - prcp: hourly `tp` is accumulation (`GRIB_stepType=accum`) — need + EDH daily product for correct monthly precip +- **Decision:** two-Zarr approach, extend backfill to all variables so + the whole dataset is internally grid-aligned + +**Next:** +- Commit QA + probe scripts on the branch (durable tooling) +- Extend backfill script to all 7 cd variables +- Regenerate `data/backfill/monthly/` entirely +- Re-run QA → R Stage 3 → PR + +**Commits this session:** +- `0014bf2` — Add planning docs for EDH migration +- `7ef03cb` — Add EDH Zarr benchmark test script +- _pending_ — Add EDH-based tmax/tmin backfill script + +**Verified 2026-04-12 02:11:** +- EDH Zarr benchmark: 15.9s / month BC t2m (5× faster than CDS) +- EDH full-year backfill: 114.3s / year BC tmax/tmin as GeoTIFF +- Output drop-in replacement for existing R Stage 2: 12 layers Jan..Dec, + EPSG:4326, °C, BC bbox — `terra::rast()` reads cleanly with expected names +- Realistic BC values for 1950: Jan tmax -30.5 to 0.7°C, Jul tmax -1.2 to 29.7°C diff --git a/planning/active/task_plan.md b/planning/active/task_plan.md new file mode 100644 index 0000000..f69bfc6 --- /dev/null +++ b/planning/active/task_plan.md @@ -0,0 +1,88 @@ +# Task Plan: Migrate cd_fetch() to DestinE Earth Data Hub Zarr + +Issue: NewGraphEnvironment/cd#36 +Branch: `36-edh-migration` + +## Context + +CDS (`ecmwfr`) rate limiting makes the tmax/tmin backfill take ~3 days of babysitting. Benchmark in #35 showed EDH Zarr delivers the same ERA5-Land data at ~15s/month (5× faster, no rate limits, 500K requests/month quota). Pivot the producer pipeline to EDH. + +## Phase 1: Scaffold and benchmark (mostly done from #35) + +- [x] Confirm EDH carries ERA5-Land at 9 km native (validated #35) +- [x] Confirm temporal coverage 1950 to present (validated #35) +- [x] Confirm commercial license (CC-BY 4.0, validated #35) +- [x] Write portable PEP 723 test script (`scripts/test_edh_era5_land.py`) +- [x] Bench one month BC t2m — 15.9s vs CDS 80s +- [x] Commit test script on the branch + +## Phase 2: Pragmatic Python backfill (unblock #33) + +The quickest path to finishing the tmax/tmin data: a Python script that uses EDH directly. +Runs outside R, produces monthly GRIB or NetCDF that downstream R stages already consume. + +- [x] Write `scripts/backfill_edh_tmax_tmin.py` — tmax/tmin only for now (unblocks #33) +- [x] Output matches existing R pipeline's Stage 2 format (yearly `.tif` × 12 month bands, °C, EPSG:4326) +- [x] Band descriptions Jan..Dec so `cd_aggregate()` seasonal grouping works +- [x] Idempotent — skip years where both output files exist +- [x] Test on one year (1950) — 114s, realistic values, terra reads correctly +- [x] Run full backfill 1950-2025 — completed in ~1h 53min (76 years × tmax + tmin) +- [x] Regenerate 2024 from EDH (was a CDS leftover) for homogeneous methodology +- [x] QA — discovered CDS-era vars on a different grid (ext shifted 0.1°, CRS missing, 121×261 vs EDH 120×260) +- [x] Probe EDH products — confirmed two-Zarr approach: hourly for state vars, daily for prcp + +## Phase 2b: Unified backfill (all variables on EDH grid) + +Grid mismatch between CDS-era vars and EDH-era tmax/tmin blocks release. +Regenerating everything from EDH gives one internally-consistent dataset. + +- [x] Extend backfill to all 7 cd variables (scripts/backfill_edh_all.py) +- [x] Regenerate all `data/backfill/monthly/*_YYYY.tif` with consistent grid + CRS +- [x] Re-run QA — all grids aligned, all CRS tagged, tmin<=tmean<=tmax sanity passes +- [x] VPD/RH derived in Python (Tetens), no R cd_derive re-run needed + +**QA summary 2026-04-12:** +- 7 variables × 76 years = 532 monthly TIFs +- All on 120×260 EPSG:4326 grid, extent [-139.95, -113.95, 47.95, 59.95] +- Zero tmin>tmax or tmean inversion violations (163,888 cell-checks) +- (tmax+tmin)/2 vs tmean mean diff = 0.57°C (classical climatology shortcut bias, normal) +- 2008 had a ClientPayloadError that my retry didn't catch (wrong exception class); + re-ran --year 2008 to fill the gap. Filed as future improvement in #38. + +## Phase 2c: R Stage 3 + +- [x] Run R stage 3 (COG + STAC + S3) against the unified EDH-generated TIFs + (scripts/pipeline_stage3_edh.R; 35 COGs live on s3://stac-era5-land, + verified via /vsicurl read) + +## Phase 3: R integration for cd_fetch() + +- [ ] Decide: reticulate + Python xarray, OR stars::read_mdim() via GDAL zarr driver +- [ ] Prototype both on one month, compare simplicity and performance +- [ ] Refactor `cd_fetch()` with `source = c("edh", "cds")` parameter, default `"edh"` +- [ ] Keep `cd_fetch()` CDS path for fallback +- [ ] Update tests to exercise both paths +- [ ] Update examples and docs + +## Phase 4: Pipeline and docs + +- [ ] Update `scripts/pipeline_backfill.R` to use EDH source +- [ ] Update monthly GitHub Action to use EDH +- [ ] Add `EDH_TOKEN` to repo secrets (rotate current token first) +- [ ] Update CLAUDE.md — CDS section → EDH primary, CDS fallback +- [ ] Update README / pkgdown auth setup instructions + +## Phase 5: Close out + +- [ ] Run full backfill from scratch via integrated R path — validate output matches phase 2 +- [ ] Close #33 (tmax/tmin saga resolved by EDH) +- [ ] Close #35 (evaluation → migration done) +- [ ] Merge PR closing #36 +- [ ] Archive planning to `planning/completed/` + +## Success criteria + +- `cd_fetch()` default path uses EDH; CDS path works as fallback +- Full 1950-2025 backfill completes in under 8 hours (single session, unattended) +- Vignette and tests pass with EDH as source +- CDS API knowledge preserved in CLAUDE.md but marked as fallback diff --git a/scripts/backfill_edh_all.py b/scripts/backfill_edh_all.py new file mode 100644 index 0000000..9d8bfcc --- /dev/null +++ b/scripts/backfill_edh_all.py @@ -0,0 +1,329 @@ +#!/usr/bin/env -S uv run --script +# /// script +# requires-python = ">=3.10" +# dependencies = [ +# "xarray", +# "zarr", +# "fsspec", +# "aiohttp", +# "requests", +# "dask", +# "numpy", +# "rioxarray", +# "rasterio", +# ] +# /// +""" +Unified EDH backfill for all cd package variables (1950-2025). + +Produces data/backfill/monthly/*.tif in a single consistent grid (EPSG:4326, +BC bbox, 120x260) with proper CRS tagging, so cd_extract() returns aligned +pixels across variables. + +Output (per year): + tmax_YYYY.tif, tmin_YYYY.tif °C (hourly t2m → daily max/min → monthly mean) + tmean_YYYY.tif °C (hourly t2m → monthly mean) + vpd_YYYY.tif hPa (Tetens from tmean + dewpoint) + rh_YYYY.tif % (from tmean + dewpoint) + prcp_YYYY.tif mm (DAILY product tp → monthly sum × 1000) + soil_moisture_YYYY.tif m3/m3 (hourly swvl1..4 → monthly mean → 4-depth mean) + +Idempotent per (variable, year): skips outputs that already exist. + +Uses TWO EDH Zarr stores: + - Hourly `reanalysis-era5-land-no-antartica-v0.zarr` for all state variables + - Daily `era5-land-daily-utc-v1.zarr` for precipitation only + (because ERA5-Land hourly `tp` has GRIB_stepType=accum and naive summing + produces wildly wrong results — the daily product handles the reset). + +Usage: + uv run scripts/backfill_edh_all.py # full backfill + uv run scripts/backfill_edh_all.py --year 2000 # single year test +""" +import argparse +import os +import subprocess +import sys +import time +from pathlib import Path + +import numpy as np +import rasterio +import rioxarray # noqa: F401 — registers .rio accessor +import xarray as xr + +# -- Config -------------------------------------------------------------------- +LAT_N, LAT_S = 60.0, 48.0 +LON_W, LON_E = -140.0, -114.0 + +YEARS_DEFAULT = range(1950, 2026) + +REPO_ROOT = Path(__file__).resolve().parent.parent +MONTHLY_DIR = REPO_ROOT / "data" / "backfill" / "monthly" + +MONTH_NAMES = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", + "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] + + +# -- Helpers ------------------------------------------------------------------- +def preflight_single_instance(): + """Refuse to start if another backfill_edh_all.py is already running. + + Protects against the CDS-era hammering incident where we had zombie + processes stacking up (see #33). Skipped on CI because each GHA run + is in a fresh container — no other instances are possible — and the + pgrep check has false positives there (uv wrapper, shell ancestors, + pgrep's own pre-exec cmdline) that are not worth chasing. + """ + if os.environ.get("GITHUB_ACTIONS") == "true": + return + + my_pid = os.getpid() + my_ppid = os.getppid() + try: + out = subprocess.run( + ["pgrep", "-f", "backfill_edh_all"], + capture_output=True, text=True, check=False, + ) + pids = [int(p) for p in out.stdout.strip().splitlines() + if p.strip() and int(p) not in (my_pid, my_ppid)] + except (FileNotFoundError, ValueError): + pids = [] + if pids: + sys.exit(f"ABORT: another backfill_edh_all is running (pids: {pids}). " + f"Kill them first: kill {' '.join(str(p) for p in pids)}") + + +def with_retry(fn, *, attempts: int = 4, initial_delay: float = 10.0, + what: str = "operation"): + """Run `fn()` with exponential backoff on transient errors. + + EDH is chunk-based (no job queue), so network blips are the main failure + mode. Retry on OSError, ConnectionError, TimeoutError. Let other errors + (KeyError, ValueError etc) propagate — those are bugs, not transient. + """ + delay = initial_delay + for i in range(1, attempts + 1): + try: + return fn() + except (OSError, ConnectionError, TimeoutError) as e: + if i == attempts: + raise + log(f" {what} failed (attempt {i}/{attempts}): {type(e).__name__}: {e}. " + f"Retrying in {delay:.0f}s...") + time.sleep(delay) + delay *= 2 + + +def get_token() -> str: + token = os.environ.get("EDH_TOKEN") + if token: + return token + renviron = Path.home() / ".Renviron" + if renviron.exists(): + for line in renviron.read_text().splitlines(): + if line.strip().startswith("EDH_TOKEN="): + return line.strip().split("=", 1)[1] + sys.exit("EDH_TOKEN not found in env or ~/.Renviron") + + +def open_zarr(url_path: str, token: str) -> xr.Dataset: + url = f"https://edh:{token}@data.earthdatahub.destine.eu/{url_path}" + return xr.open_dataset(url, chunks={}, engine="zarr") + + +def bc_slice(ds: xr.Dataset, start: str, end: str) -> dict: + lon_min = float(ds.longitude.min()) + if lon_min >= 0: + bc_west, bc_east = LON_W + 360, LON_E + 360 + else: + bc_west, bc_east = LON_W, LON_E + return dict( + valid_time=slice(start, end), + latitude=slice(LAT_N, LAT_S), + longitude=slice(bc_west, bc_east), + ) + + +def tetens_es(t_c): + """Saturation vapour pressure (hPa) given temperature in °C (Tetens).""" + return 6.1078 * np.exp(17.27 * t_c / (t_c + 237.3)) + + +def write_geotiff(da: xr.DataArray, out_path: Path): + """Write an xarray DataArray with (valid_time, latitude, longitude) dims + as a multi-band EPSG:4326 GeoTIFF with Jan..Dec band descriptions. + + Atomic: writes to a .tmp suffix then renames, so a killed run never + leaves a truncated file that passes the existence check on restart. + """ + da = da.rename({"valid_time": "band"}).assign_coords(band=MONTH_NAMES) + # Translate longitude back to -180..180 if needed + if float(da.longitude.max()) > 180: + new_lon = da.longitude.where(da.longitude <= 180, da.longitude - 360) + da = da.assign_coords(longitude=new_lon).sortby("longitude") + da = da.rename({"longitude": "x", "latitude": "y"}) + da.rio.write_crs("EPSG:4326", inplace=True) + + tmp_path = out_path.with_suffix(out_path.suffix + ".tmp") + try: + da.rio.to_raster(tmp_path, driver="GTiff") + # Per-band descriptions so terra::rast() picks up Jan..Dec names + with rasterio.open(tmp_path, "r+") as dst: + dst.descriptions = tuple(MONTH_NAMES) + os.replace(tmp_path, out_path) + except Exception: + if tmp_path.exists(): + tmp_path.unlink() + raise + + +def log(msg: str): + print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True) + + +# -- Per-year processing ------------------------------------------------------- +def outputs_for_year(year: int) -> dict: + """Map output variable name to Path.""" + return {v: MONTHLY_DIR / f"{v}_{year}.tif" for v in ( + "tmax", "tmin", "tmean", "vpd", "rh", "prcp", "soil_moisture")} + + +def process_year(year: int, hourly_ds: xr.Dataset, daily_ds: xr.Dataset): + out = outputs_for_year(year) + + # Which outputs are missing? + needed = {v: p for v, p in out.items() if not p.exists()} + if not needed: + log(f"{year}: all outputs exist, skipping") + return + + log(f"{year}: needed = {sorted(needed)}") + t_year = time.time() + + # Hourly subset for the full year (all the state variables we need) + hourly_box = bc_slice(hourly_ds, f"{year}-01-01", f"{year}-12-31T23:00") + needed_hourly_vars = [] + if any(v in needed for v in ("tmax", "tmin", "tmean", "vpd", "rh")): + needed_hourly_vars.append("t2m") + if any(v in needed for v in ("vpd", "rh")): + needed_hourly_vars.append("d2m") + if "soil_moisture" in needed: + needed_hourly_vars.extend(["swvl1", "swvl2", "swvl3", "swvl4"]) + + hourly_sub = hourly_ds[needed_hourly_vars].sel(**hourly_box) if needed_hourly_vars else None + + # -- tmax / tmin (daily max/min → monthly mean of daily stat) ------------ + if "tmax" in needed or "tmin" in needed: + t2m = hourly_sub["t2m"] + if "tmax" in needed: + daily_max = t2m.resample(valid_time="1D").max() + monthly_tmax = (daily_max.resample(valid_time="1MS").mean() - 273.15).compute() + if monthly_tmax.sizes["valid_time"] == 12: + write_geotiff(monthly_tmax, out["tmax"]) + log(f" wrote {out['tmax'].name}") + else: + log(f" SKIP tmax: got {monthly_tmax.sizes['valid_time']} months, expected 12") + if "tmin" in needed: + daily_min = t2m.resample(valid_time="1D").min() + monthly_tmin = (daily_min.resample(valid_time="1MS").mean() - 273.15).compute() + if monthly_tmin.sizes["valid_time"] == 12: + write_geotiff(monthly_tmin, out["tmin"]) + log(f" wrote {out['tmin'].name}") + else: + log(f" SKIP tmin: got {monthly_tmin.sizes['valid_time']} months, expected 12") + + # -- tmean (hourly t2m → monthly mean) ----------------------------------- + if "tmean" in needed: + monthly_tmean = (hourly_sub["t2m"].resample(valid_time="1MS").mean() - 273.15).compute() + if monthly_tmean.sizes["valid_time"] == 12: + write_geotiff(monthly_tmean, out["tmean"]) + log(f" wrote {out['tmean'].name}") + else: + log(f" SKIP tmean: got {monthly_tmean.sizes['valid_time']} months, expected 12") + + # -- vpd / rh (Tetens from monthly mean of tmean + dewpoint) ------------- + # Use monthly-mean tmean and dewpoint as inputs (same as R cd_derive path) + if "vpd" in needed or "rh" in needed: + monthly_t_c = (hourly_sub["t2m"].resample(valid_time="1MS").mean() - 273.15).compute() + monthly_td_c = (hourly_sub["d2m"].resample(valid_time="1MS").mean() - 273.15).compute() + es = tetens_es(monthly_t_c) + ea = tetens_es(monthly_td_c) + n = monthly_t_c.sizes["valid_time"] + if "vpd" in needed: + if n == 12: + vpd = (es - ea).clip(min=0) + write_geotiff(vpd, out["vpd"]) + log(f" wrote {out['vpd'].name}") + else: + log(f" SKIP vpd: got {n} months, expected 12") + if "rh" in needed: + if n == 12: + rh = (100 * ea / es).clip(min=0, max=100) + write_geotiff(rh, out["rh"]) + log(f" wrote {out['rh'].name}") + else: + log(f" SKIP rh: got {n} months, expected 12") + + # -- soil_moisture (hourly swvl1..4 → monthly mean → 4-depth mean) ------- + if "soil_moisture" in needed: + depths = [hourly_sub[f"swvl{d}"].resample(valid_time="1MS").mean() + for d in (1, 2, 3, 4)] + monthly_sm = xr.concat(depths, dim="depth").mean(dim="depth").compute() + if monthly_sm.sizes["valid_time"] == 12: + write_geotiff(monthly_sm, out["soil_moisture"]) + log(f" wrote {out['soil_moisture'].name}") + else: + log(f" SKIP soil_moisture: got {monthly_sm.sizes['valid_time']} months, expected 12") + + # -- prcp (DAILY product tp → monthly sum × 1000) ------------------------ + if "prcp" in needed: + daily_box = bc_slice(daily_ds, f"{year}-01-01", f"{year}-12-31") + tp_daily = daily_ds["tp"].sel(**daily_box) + monthly_prcp_m = tp_daily.resample(valid_time="1MS").sum() + monthly_prcp_mm = (monthly_prcp_m * 1000).compute() + if monthly_prcp_mm.sizes["valid_time"] == 12: + write_geotiff(monthly_prcp_mm, out["prcp"]) + log(f" wrote {out['prcp'].name}") + else: + log(f" SKIP prcp: got {monthly_prcp_mm.sizes['valid_time']} months, expected 12") + + log(f"{year}: done in {time.time() - t_year:.1f}s") + + +# -- Main ---------------------------------------------------------------------- +def main(years): + preflight_single_instance() + MONTHLY_DIR.mkdir(parents=True, exist_ok=True) + token = get_token() + + log("Opening hourly Zarr (reanalysis-era5-land-no-antartica-v0)...") + hourly_ds = with_retry( + lambda: open_zarr("era5/reanalysis-era5-land-no-antartica-v0.zarr", token), + what="open hourly zarr", + ) + log("Opening daily Zarr (era5-land-daily-utc-v1)...") + daily_ds = with_retry( + lambda: open_zarr("era5/era5-land-daily-utc-v1.zarr", token), + what="open daily zarr", + ) + + for year in years: + try: + with_retry( + lambda y=year: process_year(y, hourly_ds, daily_ds), + what=f"process year {year}", + ) + except Exception as e: + log(f"FAILED year {year} after retries: {type(e).__name__}: {e}") + log("Continuing to next year (idempotent — restart to retry this one)") + + log("ALL DONE") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--year", type=int, help="Single year to backfill (for testing)") + args = parser.parse_args() + years = [args.year] if args.year else YEARS_DEFAULT + main(years) diff --git a/scripts/backfill_edh_tmax_tmin.py b/scripts/backfill_edh_tmax_tmin.py new file mode 100644 index 0000000..d72c26a --- /dev/null +++ b/scripts/backfill_edh_tmax_tmin.py @@ -0,0 +1,173 @@ +#!/usr/bin/env -S uv run --script +# /// script +# requires-python = ">=3.10" +# dependencies = [ +# "xarray", +# "zarr", +# "fsspec", +# "aiohttp", +# "requests", +# "dask", +# "numpy", +# "rioxarray", +# "rasterio", +# ] +# /// +""" +Full-backfill EDH pipeline for tmax/tmin over BC (1950-2025). + +Replaces the CDS-based pipeline_tmax_tmin_hourly.R stages 1 and 2: + 1. Pull hourly 2m_temperature from DestinE Earth Data Hub (Zarr) + 2. Compute daily max/min, then monthly mean of daily max/min + 3. Write per-year GeoTIFFs with 12 month-layers each, °C + +Output drop-in replacement for data/backfill/monthly/: + tmax_YYYY.tif (12 layers: Jan..Dec, degrees C, EPSG:4326, BC bbox) + tmin_YYYY.tif (same) + +The existing R script's Stage 3 (COG aggregation, STAC catalog, S3 push) +can then run unchanged from these outputs. + +KNOWN LIMITATION: daily max/min is computed over UTC-day windows. For BC +(UTC-8 winter, UTC-7 summer) the local-afternoon tmax peak can straddle +UTC day boundaries, producing a small systematic bias vs. a local-time +daily aggregation. This matches the behaviour of the existing R pipeline +(scripts/pipeline_tmax_tmin_hourly.R) so outputs are comparable. CDS's +derived-era5-land-daily-statistics product accepts a time_zone parameter +to fix this, but we abandoned it due to rate limits (see #33). + +Idempotent — skips years whose tmax_YYYY.tif and tmin_YYYY.tif already exist. +Partial years (in-progress current year) are caught by the `n_months != 12` +guard and skipped with a warning. + +Usage: + uv run scripts/backfill_edh_tmax_tmin.py # full backfill + uv run scripts/backfill_edh_tmax_tmin.py --year 1950 # single year test +""" +import argparse +import os +import sys +import time +from pathlib import Path + +import numpy as np +import rasterio +import xarray as xr + +# -- Config -------------------------------------------------------------------- +# BC bbox, matches scripts/pipeline_tmax_tmin_hourly.R +LAT_N, LAT_S = 60.0, 48.0 +LON_W, LON_E = -140.0, -114.0 # will translate to 0-360 for EDH + +YEARS_DEFAULT = range(1950, 2026) + +REPO_ROOT = Path(__file__).resolve().parent.parent +MONTHLY_DIR = REPO_ROOT / "data" / "backfill" / "monthly" + +MONTH_NAMES = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", + "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] + + +# -- Token --------------------------------------------------------------------- +def get_token() -> str: + token = os.environ.get("EDH_TOKEN") + if token: + return token + renviron = Path.home() / ".Renviron" + if renviron.exists(): + for line in renviron.read_text().splitlines(): + if line.strip().startswith("EDH_TOKEN="): + return line.strip().split("=", 1)[1] + sys.exit("EDH_TOKEN not found in env or ~/.Renviron") + + +# -- Main ---------------------------------------------------------------------- +def main(years): + MONTHLY_DIR.mkdir(parents=True, exist_ok=True) + + token = get_token() + zarr_url = ( + f"https://edh:{token}@data.earthdatahub.destine.eu/era5/" + "reanalysis-era5-land-no-antartica-v0.zarr" + ) + + print(f"[{time.strftime('%H:%M:%S')}] Opening EDH Zarr store...") + t0 = time.time() + ds = xr.open_dataset(zarr_url, chunks={}, engine="zarr") + print(f" Opened in {time.time() - t0:.1f}s") + + # Longitude convention + lon_min = float(ds.longitude.min()) + if lon_min >= 0: + bc_west, bc_east = LON_W + 360, LON_E + 360 + else: + bc_west, bc_east = LON_W, LON_E + + for year in years: + tmax_out = MONTHLY_DIR / f"tmax_{year}.tif" + tmin_out = MONTHLY_DIR / f"tmin_{year}.tif" + + if tmax_out.exists() and tmin_out.exists(): + print(f"[{time.strftime('%H:%M:%S')}] {year}: exists, skipping") + continue + + print(f"[{time.strftime('%H:%M:%S')}] {year}: fetching...") + t_year = time.time() + + # Pull entire year of hourly t2m for BC + hourly = ds["t2m"].sel( + valid_time=slice(f"{year}-01-01", f"{year}-12-31T23:00"), + latitude=slice(LAT_N, LAT_S), + longitude=slice(bc_west, bc_east), + ) + + # Daily max/min, then monthly mean of each → 12 layers per year + # resample labels: '1D' → daily, '1MS' → month start + daily_max = hourly.resample(valid_time="1D").max() + daily_min = hourly.resample(valid_time="1D").min() + monthly_tmax = daily_max.resample(valid_time="1MS").mean() - 273.15 + monthly_tmin = daily_min.resample(valid_time="1MS").mean() - 273.15 + + # Materialize + monthly_tmax = monthly_tmax.compute() + monthly_tmin = monthly_tmin.compute() + + n_months = monthly_tmax.sizes["valid_time"] + if n_months != 12: + print(f" WARNING: {year} has {n_months} months, expected 12 — skipping") + continue + + # Name layers Jan..Dec, translate longitude back to -180..180 for GeoTIFF + def to_geotiff_raster(da, out_path): + da = da.rename({"valid_time": "band"}) + da = da.assign_coords(band=MONTH_NAMES) + # Translate longitude back if needed + if float(da.longitude.max()) > 180: + new_lon = da.longitude.where(da.longitude <= 180, da.longitude - 360) + da = da.assign_coords(longitude=new_lon).sortby("longitude") + # rioxarray expects 'x' and 'y' dim names + da = da.rename({"longitude": "x", "latitude": "y"}) + da.rio.write_crs("EPSG:4326", inplace=True) + da.rio.to_raster(out_path) + # Set per-band descriptions so terra::rast() picks up Jan..Dec names + # (required by cd_aggregate() for seasonal grouping) + with rasterio.open(out_path, "r+") as dst: + dst.descriptions = tuple(MONTH_NAMES) + + import rioxarray # noqa: F401 — registers `.rio` accessor + + to_geotiff_raster(monthly_tmax, tmax_out) + to_geotiff_raster(monthly_tmin, tmin_out) + + elapsed = time.time() - t_year + print(f" Wrote {tmax_out.name} and {tmin_out.name} in {elapsed:.1f}s") + + print(f"[{time.strftime('%H:%M:%S')}] DONE") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--year", type=int, help="Single year to backfill (for testing)") + args = parser.parse_args() + years = [args.year] if args.year else YEARS_DEFAULT + main(years) diff --git a/scripts/pipeline_stage3_edh.R b/scripts/pipeline_stage3_edh.R new file mode 100644 index 0000000..6cc9ef6 --- /dev/null +++ b/scripts/pipeline_stage3_edh.R @@ -0,0 +1,110 @@ +#!/usr/bin/env Rscript +# +# pipeline_stage3_edh.R +# +# Stage 3 of the EDH-based backfill: take the unified monthly TIFs in +# data/backfill/monthly/, aggregate to seasonal/annual COGs, write a STAC +# catalog, and push everything to S3. +# +# Assumes Stage 2 (scripts/backfill_edh_all.py) has produced: +# data/backfill/monthly/{tmax,tmin,tmean,prcp,vpd,rh,soil_moisture}_YYYY.tif +# — 12 layers each (Jan..Dec), EPSG:4326, BC bbox. +# +# Usage: +# Rscript scripts/pipeline_stage3_edh.R +# Rscript scripts/pipeline_stage3_edh.R --dry-run # no S3 push + +if (requireNamespace("cd", quietly = TRUE)) library(cd) else devtools::load_all() +suppressMessages(library(terra)) + +args <- commandArgs(trailingOnly = TRUE) +dry_run <- "--dry-run" %in% args + +# -- Config -------------------------------------------------------------------- +bucket <- "stac-era5-land" +monthly_dir <- "data/backfill/monthly" +cog_dir <- "data/backfill/cogs" +seasons <- cd_seasons() + +agg_methods <- c( + tmean = "mean", tmax = "mean", tmin = "mean", + prcp = "sum", vpd = "mean", rh = "mean", soil_moisture = "mean" +) + +dir.create(cog_dir, recursive = TRUE, showWarnings = FALSE) + +log_msg <- function(...) { + cat(sprintf("[%s] %s\n", format(Sys.time(), "%H:%M:%S"), paste0(...))) +} + +# -- Step 1: Aggregate to seasonal/annual COGs -------------------------------- +log_msg("=== STEP 1: Aggregate monthly -> seasonal/annual COGs ===") + +all_vars <- names(agg_methods) + +for (var in all_vars) { + method <- agg_methods[[var]] + + # Find all monthly files for this variable + monthly_files <- list.files( + monthly_dir, + pattern = paste0("^", var, "_\\d{4}\\.tif$"), + full.names = TRUE + ) + if (length(monthly_files) == 0) { + log_msg(" ", var, ": no monthly files, skipping") + next + } + years <- sort(as.integer( + sub(paste0(var, "_(\\d{4})\\.tif"), "\\1", basename(monthly_files)) + )) + + log_msg(sprintf(" %s: %d years (%d-%d), method=%s", + var, length(years), min(years), max(years), method)) + + for (period in c("annual", names(seasons))) { + cog_path <- file.path(cog_dir, paste0(var, "_", period, ".tif")) + year_layers <- list() + + for (yr in years) { + mf <- file.path(monthly_dir, paste0(var, "_", yr, ".tif")) + r_m <- rast(mf) + if (nlyr(r_m) != 12) { + warning(sprintf("%s %d: has %d layers, need 12, skipping", + var, yr, nlyr(r_m)), call. = FALSE) + next + } + periods <- cd_aggregate(r_m, method = method, seasons = seasons) + if (period %in% names(periods)) { + year_layers[[as.character(yr)]] <- periods[[period]] + } + } + + if (length(year_layers) == 0) next + + multi <- rast(year_layers) + names(multi) <- names(year_layers) + cd_cog_write(multi, cog_path, overwrite = TRUE) + log_msg(sprintf(" wrote %s (%d years)", basename(cog_path), nlyr(multi))) + } +} + +# -- Step 2: STAC catalog ------------------------------------------------------ +log_msg("=== STEP 2: Build STAC catalog ===") +cd_stac_catalog( + cog_dir, + output_path = file.path(cog_dir, "catalog.json"), + base_url = paste0("https://", bucket, ".s3.us-west-2.amazonaws.com") +) +log_msg(" wrote ", file.path(cog_dir, "catalog.json")) + +# -- Step 3: S3 push ----------------------------------------------------------- +log_msg("=== STEP 3: Push to S3 ===") +if (dry_run) { + log_msg(" DRY RUN — showing what would be uploaded:") + cd_s3_push(cog_dir, bucket = bucket, dry_run = TRUE) +} else { + cd_s3_push(cog_dir, bucket = bucket, dry_run = FALSE) +} + +log_msg("=== DONE ===") diff --git a/scripts/pipeline_update_edh.R b/scripts/pipeline_update_edh.R new file mode 100644 index 0000000..f662fca --- /dev/null +++ b/scripts/pipeline_update_edh.R @@ -0,0 +1,193 @@ +#!/usr/bin/env Rscript +# +# pipeline_update_edh.R +# +# Incremental monthly update via DestinE Earth Data Hub. +# Replaces the CDS-based pipeline_update.R. +# +# Flow: +# 1. Read STAC catalog from S3 → find latest year already published +# 2. Determine target year (latest complete year available on EDH) +# 3. If behind, call scripts/backfill_edh_all.py for each missing year +# (idempotent — Python script skips files that already exist) +# 4. For each variable × period, read existing COG from S3 via /vsicurl, +# aggregate the new year's monthly TIFs, append to the COG, write +# locally, push to S3 +# 5. Rebuild catalog, push to S3 +# +# Designed for the monthly GitHub Action (climate-update.yml). Exits +# cleanly with status 0 if nothing new is available. +# +# Prerequisites: +# - EDH_TOKEN in env or ~/.Renviron +# - AWS CLI configured (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, +# AWS_DEFAULT_REGION=us-west-2) +# - uv installed (for running the Python backfill) +# +# Usage: +# Rscript scripts/pipeline_update_edh.R + +if (requireNamespace("cd", quietly = TRUE)) library(cd) else devtools::load_all() +suppressMessages(library(terra)) + +# -- Config -------------------------------------------------------------------- +bucket <- "stac-era5-land" +catalog_url <- paste0("https://", bucket, ".s3.us-west-2.amazonaws.com/catalog.json") +monthly_dir <- "data/backfill/monthly" +cog_dir <- "data/update/cogs" +seasons <- cd_seasons() + +agg_methods <- c( + tmean = "mean", tmax = "mean", tmin = "mean", + prcp = "sum", vpd = "mean", rh = "mean", soil_moisture = "mean" +) + +dir.create(monthly_dir, recursive = TRUE, showWarnings = FALSE) +dir.create(cog_dir, recursive = TRUE, showWarnings = FALSE) + +log_msg <- function(...) { + cat(sprintf("[%s] %s\n", format(Sys.time(), "%Y-%m-%d %H:%M:%S"), paste0(...))) +} + +# -- Step 1: determine state --------------------------------------------------- +log_msg("=== STEP 1: Check S3 catalog for latest year ===") + +catalog <- tryCatch( + cd_catalog(catalog_url), + error = function(e) { + log_msg("No catalog at ", catalog_url, " — run full backfill first (scripts/backfill_edh_all.py + pipeline_stage3_edh.R)") + quit(status = 1) + } +) + +# Read one COG to find latest year +tmean_row <- catalog[catalog$variable == "tmean" & catalog$period == "annual", ] +if (nrow(tmean_row) == 0) { + log_msg("No tmean_annual in catalog — run full backfill first") + quit(status = 1) +} +r_current <- rast(paste0("/vsicurl/", tmean_row$href)) +current_years <- as.integer(names(r_current)) +latest_year <- max(current_years, na.rm = TRUE) +log_msg("Latest year on S3: ", latest_year) + +# -- Step 2: target year ------------------------------------------------------ +# ERA5-Land has ~2-3 month latency. Try the current year — if EDH has all +# 12 months, backfill_edh_all.py writes; otherwise it skips cleanly and we +# move on. Also try latest_year + 1 in case we're behind for another reason. +current_year <- as.integer(format(Sys.Date(), "%Y")) +if (latest_year >= current_year) { + log_msg("Already at or past current year (", latest_year, " >= ", current_year, ")") + log_msg("Nothing to do.") + quit(status = 0) +} +candidate_years <- seq(latest_year + 1, current_year) +log_msg("Candidate years to fetch: ", paste(candidate_years, collapse = ", ")) + +# -- Step 3: fetch via EDH ---------------------------------------------------- +log_msg("=== STEP 3: Fetch missing years via EDH ===") + +new_years_written <- c() +any_fetch_errored <- FALSE +for (yr in candidate_years) { + log_msg(" Fetching ", yr, " via backfill_edh_all.py...") + # Pass through Python stdout/stderr so a failure is debuggable from logs. + status <- system2( + "uv", c("run", "scripts/backfill_edh_all.py", "--year", as.character(yr)) + ) + if (status != 0) { + log_msg(" FAILED for ", yr, " (exit ", status, ") — continuing to next year") + any_fetch_errored <- TRUE + next + } + # Check all 7 variables wrote (backfill_edh_all.py skips partial years) + all_vars <- names(agg_methods) + wrote_all <- all(vapply(all_vars, function(v) { + file.exists(file.path(monthly_dir, paste0(v, "_", yr, ".tif"))) + }, logical(1))) + if (wrote_all) { + log_msg(" ", yr, ": wrote all 7 variables") + new_years_written <- c(new_years_written, yr) + } else { + log_msg(" ", yr, ": partial or unavailable on EDH yet, skipping") + } +} + +if (length(new_years_written) == 0) { + if (any_fetch_errored) { + log_msg("ERROR: attempted fetch(es) errored and no new years were written.") + log_msg("Exiting non-zero so the run is visibly failed.") + quit(status = 1) + } + log_msg("No new complete years available on EDH yet (latency is normal).") + quit(status = 0) +} +log_msg("New years to integrate: ", paste(new_years_written, collapse = ", ")) + +# -- Step 4: rebuild COGs (existing from S3 + new years) ---------------------- +log_msg("=== STEP 4: Append new years to existing COGs ===") + +all_vars <- names(agg_methods) +for (var in all_vars) { + method <- agg_methods[[var]] + + for (period in c("annual", names(seasons))) { + cog_name <- paste0(var, "_", period, ".tif") + cog_path <- file.path(cog_dir, cog_name) + + # Read existing COG from S3 (via /vsicurl) + existing_row <- catalog[catalog$variable == var & catalog$period == period, ] + if (nrow(existing_row) == 0) { + log_msg(" ", var, "_", period, ": not in catalog, skipping") + next + } + existing_rast <- tryCatch( + rast(paste0("/vsicurl/", existing_row$href)), + error = function(e) { + stop("Failed to read existing COG: ", existing_row$href, + "\nError: ", e$message, call. = FALSE) + } + ) + + # Aggregate new years + new_layers <- list() + for (yr in new_years_written) { + mf <- file.path(monthly_dir, paste0(var, "_", yr, ".tif")) + if (!file.exists(mf)) next + r_m <- rast(mf) + if (nlyr(r_m) != 12) next + periods <- cd_aggregate(r_m, method = method, seasons = seasons) + if (period %in% names(periods)) { + new_layers[[as.character(yr)]] <- periods[[period]] + } + } + if (length(new_layers) == 0) next + + new_rast <- rast(new_layers) + names(new_rast) <- names(new_layers) + # Sanity: existing COG from S3 must share grid with freshly aggregated + # layers. Floating-point epsilon differences in ext()/res() would break + # c() stacking — catch it explicitly rather than produce a cryptic error. + if (!isTRUE(all.equal(as.vector(ext(existing_rast)), + as.vector(ext(new_rast)), tolerance = 1e-6)) || + !isTRUE(all.equal(res(existing_rast), res(new_rast), tolerance = 1e-6))) { + stop("Grid mismatch between existing COG (", existing_row$href, ") and new ", + var, "_", period, ". Extent/res differ. Aborting.", call. = FALSE) + } + combined <- c(existing_rast, new_rast) + cd_cog_write(combined, cog_path, overwrite = TRUE) + log_msg(" Updated: ", cog_name, " (", nlyr(combined), " years total)") + } +} + +# -- Step 5: rebuild catalog + push ------------------------------------------- +log_msg("=== STEP 5: Rebuild catalog + push to S3 ===") +cd_stac_catalog( + cog_dir, + output_path = file.path(cog_dir, "catalog.json"), + base_url = paste0("https://", bucket, ".s3.us-west-2.amazonaws.com") +) +cd_s3_push(cog_dir, bucket = bucket, dry_run = FALSE) + +log_msg("=== UPDATE COMPLETE ===") +log_msg("Years added: ", paste(new_years_written, collapse = ", ")) diff --git a/scripts/probe_edh_vars.py b/scripts/probe_edh_vars.py new file mode 100644 index 0000000..091f8ca --- /dev/null +++ b/scripts/probe_edh_vars.py @@ -0,0 +1,222 @@ +#!/usr/bin/env -S uv run --script +# /// script +# requires-python = ">=3.10" +# dependencies = [ +# "xarray", +# "zarr", +# "fsspec", +# "aiohttp", +# "requests", +# "dask", +# "numpy", +# "rioxarray", +# "rasterio", +# ] +# /// +""" +Probe EDH to confirm each variable we need is what we expect, before +extending the backfill to all variables. + +Strategy: + 1. For one sample month (Jan 2000), pull EDH hourly data for each + candidate variable. + 2. Aggregate to monthly (mean for state variables, sum for precip). + 3. Compare against the existing CDS-derived monthly TIF in + data/backfill/monthly/ for the same variable and month. + 4. Report: units, value ranges, correlation, mean abs diff. + +If EDH output agrees with CDS to within expected rounding/grid-boundary +differences, we're safe to extend the backfill. + +Variables probed: + - t2m → tmean (monthly mean of hourly) + - d2m → dewpoint (monthly mean of hourly) + - tp → prcp (monthly SUM of hourly, × 1000 m → mm) + - swvl1..4 → soil_moisture (monthly mean of hourly, depth-weighted combine + done in R cd_derive_soil — here we just report per-depth mean) + +Usage: + uv run scripts/probe_edh_vars.py +""" +import os +import sys +import time +from pathlib import Path + +import numpy as np +import rasterio +import xarray as xr + +LAT_N, LAT_S = 60.0, 48.0 +LON_W, LON_E = -140.0, -114.0 + +YEAR = 2000 +MONTH = 1 # January +MONTH_LABEL = "Jan" + +REPO_ROOT = Path(__file__).resolve().parent.parent +MONTHLY_DIR = REPO_ROOT / "data" / "backfill" / "monthly" + + +def get_token() -> str: + token = os.environ.get("EDH_TOKEN") + if token: + return token + renviron = Path.home() / ".Renviron" + if renviron.exists(): + for line in renviron.read_text().splitlines(): + if line.strip().startswith("EDH_TOKEN="): + return line.strip().split("=", 1)[1] + sys.exit("EDH_TOKEN not found") + + +def read_cds_layer(var_name: str, layer_label: str = MONTH_LABEL) -> np.ndarray | None: + """Read one month-layer from an existing CDS-era TIF, if present.""" + path = MONTHLY_DIR / f"{var_name}_{YEAR}.tif" + if not path.exists(): + return None + with rasterio.open(path) as src: + if src.descriptions: + try: + idx = src.descriptions.index(layer_label) + 1 + except ValueError: + idx = 1 # fall back to first band if not labelled + else: + idx = 1 + return src.read(idx) + + +def summary(name: str, arr: np.ndarray, unit: str): + print(f" {name:30s}: {np.nanmin(arr):10.3f} {np.nanmax(arr):10.3f} " + f"{np.nanmean(arr):10.3f} {unit} (min, max, mean)") + + +def compare(name: str, edh: np.ndarray, cds: np.ndarray | None, unit: str): + if cds is None: + print(f" {name}: no CDS file to compare — EDH only") + summary(f" EDH {name}", edh, unit) + return + + # Dimensions may differ by 1 (121 vs 120 etc) — crop to min shape, which + # is only informative if the grids line up at a reference point. + min_rows = min(edh.shape[0], cds.shape[0]) + min_cols = min(edh.shape[1], cds.shape[1]) + e = edh[:min_rows, :min_cols] + c = cds[:min_rows, :min_cols] + + # Mask where either is NaN + mask = ~(np.isnan(e) | np.isnan(c)) + diff = e[mask] - c[mask] + + print(f"\n {name}:") + summary(f" EDH {name}", edh, unit) + summary(f" CDS {name}", cds, unit) + print(f" overlap (min shape {min_rows}x{min_cols}, {mask.sum()} shared non-NaN cells):") + print(f" diff (EDH - CDS): min={diff.min():.3f}, max={diff.max():.3f}, " + f"mean={diff.mean():.3f}, abs mean={np.abs(diff).mean():.3f} {unit}") + if len(diff) > 100: + corr = np.corrcoef(e[mask], c[mask])[0, 1] + print(f" correlation: {corr:.4f}") + + +def main(): + token = get_token() + zarr_url = ( + f"https://edh:{token}@data.earthdatahub.destine.eu/era5/" + "reanalysis-era5-land-no-antartica-v0.zarr" + ) + + print(f"[{time.strftime('%H:%M:%S')}] Opening EDH Zarr...") + ds = xr.open_dataset(zarr_url, chunks={}, engine="zarr") + + # Longitude translation + lon_min = float(ds.longitude.min()) + if lon_min >= 0: + bc_west, bc_east = LON_W + 360, LON_E + 360 + else: + bc_west, bc_east = LON_W, LON_E + + # Time slice for one month + n_days = [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31][MONTH - 1] # 2000 was leap + start = f"{YEAR}-{MONTH:02d}-01" + end = f"{YEAR}-{MONTH:02d}-{n_days}T23:00" + + print(f"[{time.strftime('%H:%M:%S')}] Subsetting to BC, {start} to {end}") + box = dict( + valid_time=slice(start, end), + latitude=slice(LAT_N, LAT_S), + longitude=slice(bc_west, bc_east), + ) + + # -- t2m → tmean ---------------------------------------------------------- + print(f"\n[{time.strftime('%H:%M:%S')}] --- t2m (→ tmean) ---") + t2m_hourly = ds["t2m"].sel(**box) + t2m_monthly = (t2m_hourly.mean(dim="valid_time") - 273.15).compute().values + cds_tmean_jan = read_cds_layer("tmean") + compare("tmean (°C)", t2m_monthly, cds_tmean_jan, "°C") + + # -- d2m → dewpoint ------------------------------------------------------- + print(f"\n[{time.strftime('%H:%M:%S')}] --- d2m (→ dewpoint) ---") + d2m_hourly = ds["d2m"].sel(**box) + d2m_monthly = (d2m_hourly.mean(dim="valid_time") - 273.15).compute().values + # CDS stored dewpoint in its raw pipeline stage; may not have a direct + # monthly TIF since we derive VPD/RH from it. Just report EDH summary. + summary("EDH dewpoint (°C)", d2m_monthly, "°C") + + # -- tp → prcp ------------------------------------------------------------ + # ERA5-Land tp is "total precipitation since start of forecast"; in the + # hourly version it is accumulated from 00 UTC reset each day in the + # most common convention. To get monthly precip (mm) we need to + # understand the accumulation behaviour first. + print(f"\n[{time.strftime('%H:%M:%S')}] --- tp (→ prcp) ---") + tp_hourly = ds["tp"].sel(**box) + # Probe: look at attributes and a single-cell time series to understand + # accumulation behaviour. + print(f" tp attrs: {dict(tp_hourly.attrs)}") + # Central-BC land cell: Prince George is ~53.9N, -122.75W → +237.25 in 0-360 + prince_george = tp_hourly.sel(latitude=53.9, longitude=237.25, method="nearest") + pg_first_48h = prince_george.isel(valid_time=slice(0, 48)).compute().values + print(f" Prince George first 48h tp (m): min={pg_first_48h.min():.6f} " + f"max={pg_first_48h.max():.6f} mean={pg_first_48h.mean():.6f}") + print(f" First 24 hours: {[f'{v:.6f}' for v in pg_first_48h[:24]]}") + print(f" Hours 24-48: {[f'{v:.6f}' for v in pg_first_48h[24:48]]}") + + # Both interpretations below are intentionally wrong to demonstrate that + # hourly tp accumulation cannot be naively summed. + # + # A: treat hourly as per-hour increment → 8× too high because tp is + # actually a running daily accumulation, not per-hour. + # B: sum of daily max using calendar-day grouping → still biased because + # the 01:00 UTC reset means the "max" within a calendar day window + # is typically at 00:00 (the END of the previous accumulation cycle), + # which is yesterday's total, not today's. + # + # The correct answer is to use EDH's daily product which pre-computes + # daily totals with the right semantics. Demonstrating the failure modes + # here so the decision is self-documenting. + tp_sum_mm = (tp_hourly.sum(dim="valid_time") * 1000).compute().values + tp_daily_max = tp_hourly.resample(valid_time="1D").max() + tp_daily_sum_mm = (tp_daily_max.sum(dim="valid_time") * 1000).compute().values + + cds_prcp_jan = read_cds_layer("prcp") + print("\n Interpretation A — hourly tp is per-hour increment (sum × 1000):") + compare("prcp_A (mm)", tp_sum_mm, cds_prcp_jan, "mm") + print("\n Interpretation B — hourly tp is daily accumulation (sum of daily max × 1000):") + compare("prcp_B (mm)", tp_daily_sum_mm, cds_prcp_jan, "mm") + + # -- swvl1..4 → soil_moisture --------------------------------------------- + print(f"\n[{time.strftime('%H:%M:%S')}] --- swvl1..4 (→ soil_moisture) ---") + for depth in (1, 2, 3, 4): + v = f"swvl{depth}" + sm = ds[v].sel(**box).mean(dim="valid_time").compute().values + summary(f"EDH {v} (m³/m³)", sm, "m³/m³") + # Existing CDS-derived soil_moisture is a single composite layer per month. + cds_sm_jan = read_cds_layer("soil_moisture") + if cds_sm_jan is not None: + summary("CDS soil_moisture composite (m³/m³)", cds_sm_jan, "m³/m³") + + print(f"\n[{time.strftime('%H:%M:%S')}] DONE") + + +if __name__ == "__main__": + main() diff --git a/scripts/qa_monthly.R b/scripts/qa_monthly.R new file mode 100644 index 0000000..f308760 --- /dev/null +++ b/scripts/qa_monthly.R @@ -0,0 +1,147 @@ +#!/usr/bin/env Rscript +# +# QA: check all monthly GeoTIFFs in data/backfill/monthly/ for alignment +# and sanity before Stage 3 COG/STAC/S3. +# +# Checks: +# 1. Grid alignment — extent, resolution, CRS, origin identical across +# all variables for a sample year (2000) +# 2. Time coverage per variable +# 3. Physical sanity — tmin <= tmean <= tmax at each pixel/month, for +# a handful of sample years +# 4. Value ranges per variable stay within plausible bounds +# 5. CDS vs EDH comparison — mean of monthly climate for a shared variable +# (if any CDS-era file still exists) to spot obvious shifts + +suppressMessages(library(terra)) + +monthly_dir <- "data/backfill/monthly" +log_msg <- function(...) cat(sprintf("[%s] %s\n", format(Sys.time(), "%H:%M:%S"), paste0(...))) + +# -- 1. Grid alignment across variables (sample year 2000) -------------------- +log_msg("=== 1. GRID ALIGNMENT (year 2000) ===") +vars <- c("tmax", "tmin", "tmean", "prcp", "vpd", "rh", "soil_moisture") +ref <- NULL +for (v in vars) { + f <- file.path(monthly_dir, paste0(v, "_2000.tif")) + if (!file.exists(f)) { + log_msg(" ", v, ": MISSING") + next + } + r <- rast(f) + e <- as.vector(ext(r)) + res_r <- res(r) + crs_code <- crs(r, describe = TRUE)$code + if (is.na(crs_code) || length(crs_code) == 0) crs_code <- "UNKNOWN" + if (is.null(ref)) { + ref <- list(ext = e, res = res_r, crs = crs_code, ncell = ncell(r)) + log_msg(" ", v, ": [REF] ext=", paste(round(e, 3), collapse = ","), + " res=", paste(round(res_r, 4), collapse = ","), + " crs=", crs_code, " ncell=", ncell(r), " nlyr=", nlyr(r)) + } else { + same_ext <- isTRUE(all.equal(e, ref$ext, tolerance = 1e-6)) + same_res <- isTRUE(all.equal(res_r, ref$res, tolerance = 1e-6)) + same_crs <- identical(crs_code, ref$crs) + same_ncell <- ncell(r) == ref$ncell + status <- if (same_ext && same_res && same_crs && same_ncell) "OK" else "MISALIGNED" + log_msg(" ", v, ": ", status, + if (!same_ext) paste0(" ext_diff=", paste(round(e - ref$ext, 5), collapse = ",")) else "", + if (!same_res) paste0(" res_diff=", paste(round(res_r - ref$res, 6), collapse = ",")) else "", + if (!same_crs) paste0(" crs=", crs_code) else "", + if (!same_ncell) paste0(" ncell=", ncell(r)) else "", + " nlyr=", nlyr(r)) + } +} + +# -- 2. Time coverage per variable -------------------------------------------- +log_msg("") +log_msg("=== 2. TIME COVERAGE ===") +for (v in vars) { + files <- list.files(monthly_dir, pattern = paste0("^", v, "_\\d{4}\\.tif$"), full.names = FALSE) + years <- sort(as.integer(sub(paste0(v, "_(\\d{4})\\.tif"), "\\1", files))) + if (length(years) == 0) { + log_msg(" ", v, ": no files") + next + } + gaps <- setdiff(seq(min(years), max(years)), years) + log_msg(" ", v, ": ", length(years), " years, ", min(years), " to ", max(years), + if (length(gaps) > 0) paste0(", GAPS: ", paste(gaps, collapse = ",")) else ", no gaps") +} + +# -- 3. Physical sanity tmin <= tmean <= tmax (sample years) ------------------- +log_msg("") +log_msg("=== 3. PHYSICAL SANITY tmin <= tmean <= tmax ===") +sample_years <- c(1960, 1990, 2010, 2024) +for (y in sample_years) { + fx <- file.path(monthly_dir, paste0("tmax_", y, ".tif")) + fn <- file.path(monthly_dir, paste0("tmin_", y, ".tif")) + fm <- file.path(monthly_dir, paste0("tmean_", y, ".tif")) + if (!all(file.exists(fx), file.exists(fn), file.exists(fm))) { + log_msg(" ", y, ": one or more files missing, skip") + next + } + x <- rast(fx); n <- rast(fn); m <- rast(fm) + # Check on Jan (layer 1) and Jul (layer 7) + for (lyr in c(1, 7)) { + # Cell-wise comparison — count violations + v_xmin <- values(n[[lyr]] > x[[lyr]], na.rm = TRUE) + v_mmax <- values(m[[lyr]] > x[[lyr]], na.rm = TRUE) + v_mmin <- values(m[[lyr]] < n[[lyr]], na.rm = TRUE) + n_viol_xmin <- sum(v_xmin, na.rm = TRUE) + n_viol_mmax <- sum(v_mmax, na.rm = TRUE) + n_viol_mmin <- sum(v_mmin, na.rm = TRUE) + total <- sum(!is.na(values(x[[lyr]]))) + log_msg(sprintf(" %d %s: tmin>tmax=%d, tmean>tmax=%d, tmean= exp[1] && v_max <= exp[2] + log_msg(sprintf(" %s: [%.2f, %.2f] (expected [%s, %s]) %s", + v, v_min, v_max, + if (!is.null(exp)) as.character(exp[1]) else "?", + if (!is.null(exp)) as.character(exp[2]) else "?", + if (in_range) "OK" else "OUT OF RANGE")) +} + +# -- 5. CDS vs EDH tmean comparison (if CDS tmean exists) --------------------- +# If our CDS tmean and EDH tmax/tmin cover the same year, we can check that +# (tmax + tmin) / 2 is roughly correlated with tmean. Not strict equality +# because tmean is mean of hourly, while (tmax+tmin)/2 is the old-school +# climatologist's approximation — but they should agree within ~5C. +log_msg("") +log_msg("=== 5. CDS tmean vs EDH (tmax+tmin)/2 — Jan 2000 ===") +fm <- file.path(monthly_dir, "tmean_2000.tif") +fx <- file.path(monthly_dir, "tmax_2000.tif") +fn <- file.path(monthly_dir, "tmin_2000.tif") +if (all(file.exists(fm), file.exists(fx), file.exists(fn))) { + m <- rast(fm)[[1]] + approx <- (rast(fx)[[1]] + rast(fn)[[1]]) / 2 + diff_r <- approx - m + s <- global(diff_r, c("mean", "min", "max"), na.rm = TRUE) + log_msg(sprintf(" (tmax+tmin)/2 - tmean: mean=%.2f, min=%.2f, max=%.2f C", + s$mean, s$min, s$max)) + log_msg(" Expected: mean near 0, typical max/min within ~5 C for most cells") +} else { + log_msg(" skip: missing files") +} + +log_msg("") +log_msg("=== QA DONE ===") diff --git a/scripts/test_edh_era5_land.py b/scripts/test_edh_era5_land.py new file mode 100644 index 0000000..25305f3 --- /dev/null +++ b/scripts/test_edh_era5_land.py @@ -0,0 +1,118 @@ +#!/usr/bin/env -S uv run --script +# /// script +# requires-python = ">=3.10" +# dependencies = [ +# "xarray", +# "zarr", +# "fsspec", +# "aiohttp", +# "requests", +# "dask", +# "numpy", +# ] +# /// +""" +Benchmark test: pull one month of ERA5-Land hourly 2m_temperature for BC +bbox from DestinE Earth Data Hub (Zarr) and compare to CDS throughput. + +Reference: CDS takes ~80s per month (download + polite sleep). If EDH Zarr +can do the same in <30s, we have a clear winner. + +Portable — uses PEP 723 inline deps. Run with: + uv run scripts/test_edh_era5_land.py + +Token lookup order: EDH_TOKEN env var, then ~/.Renviron. +""" +import os +import time +import xarray as xr +import numpy as np + +# -- Config -------------------------------------------------------------------- +# BC bbox (matches scripts/pipeline_tmax_tmin_hourly.R) +# area format in ecmwfr: c(N, W, S, E) = c(60, -140, 48, -114) +LAT_N, LAT_S = 60.0, 48.0 +LON_W, LON_E = -140.0, -114.0 + +# One test month to benchmark against CDS +YEAR = 1960 +MONTH = 1 + +# -- Token --------------------------------------------------------------------- +token = os.environ.get("EDH_TOKEN") +if not token: + # Fallback: try reading from ~/.Renviron + renviron = os.path.expanduser("~/.Renviron") + if os.path.exists(renviron): + with open(renviron) as f: + for line in f: + if line.strip().startswith("EDH_TOKEN="): + token = line.strip().split("=", 1)[1] + break +if not token: + raise SystemExit("EDH_TOKEN not found in env or ~/.Renviron") + +# -- Open Zarr ----------------------------------------------------------------- +zarr_url = ( + f"https://edh:{token}@data.earthdatahub.destine.eu/era5/" + "reanalysis-era5-land-no-antartica-v0.zarr" +) + +print(f"Opening Zarr store for ERA5-Land hourly...") +t0 = time.time() +ds = xr.open_dataset(zarr_url, chunks={}, engine="zarr") +print(f" Opened in {time.time() - t0:.1f}s") +print(f" Variables: {list(ds.data_vars)}") +print(f" Full extent: {dict(ds.sizes)}") +print(f" Time range: {ds.valid_time.values.min()} to {ds.valid_time.values.max()}") + +# -- Subset to BC + one month -------------------------------------------------- +print(f"\nSubsetting to BC bbox, {YEAR}-{MONTH:02d} ...") +t0 = time.time() + +# EDH uses longitude 0-360 or -180-180? Check coord convention +lon_min, lon_max = float(ds.longitude.min()), float(ds.longitude.max()) +print(f" Longitude convention: {lon_min} to {lon_max}") + +# Translate BC bbox if EDH uses 0-360 +if lon_min >= 0: + bc_west = LON_W + 360 + bc_east = LON_E + 360 +else: + bc_west, bc_east = LON_W, LON_E + +start = f"{YEAR}-{MONTH:02d}-01" +end = f"{YEAR}-{MONTH:02d}-28" if MONTH == 2 else f"{YEAR}-{MONTH:02d}-{[31,28,31,30,31,30,31,31,30,31,30,31][MONTH-1]}" + +subset = ds["t2m"].sel( + valid_time=slice(start, end), + latitude=slice(LAT_N, LAT_S), # EDH typically has lat descending + longitude=slice(bc_west, bc_east), +) +print(f" Subset shape: {dict(subset.sizes)}") +assert all(v > 0 for v in subset.sizes.values()), ( + f"Empty subset {dict(subset.sizes)} — check lat direction / lon convention" +) + +# Force compute (download chunks) +values = subset.values +elapsed = time.time() - t0 +size_mb = values.nbytes / 1e6 +print(f" Downloaded and materialized in {elapsed:.1f}s") +print(f" In-memory size: {size_mb:.1f} MB") +n_total = values.size +n_nan = np.isnan(values).sum() +print(f" NaN cells (ocean): {n_nan} / {n_total} ({100 * n_nan / n_total:.1f}%)") +print(f" Values (land only): {np.nanmin(values):.1f} to {np.nanmax(values):.1f} K") +print(f" Values (land only, C): {np.nanmin(values) - 273.15:.1f} to {np.nanmax(values) - 273.15:.1f} C") + +# -- Verdict ------------------------------------------------------------------- +print("\n=== BENCHMARK VS CDS ===") +print(f"EDH: {elapsed:.1f}s for one month BC hourly t2m") +print(f"CDS: ~80s per month (download + polite sleep)") +if elapsed < 60: + print(f"EDH is {80 / elapsed:.1f}x faster — clear winner") +elif elapsed < 120: + print(f"EDH is comparable to CDS but without quota issues") +else: + print(f"EDH is slower than CDS — not worth switching on speed alone")