|
| 1 | +"""Rebuild every clean dataset from raw UKDS files held on GCS. |
| 2 | +
|
| 3 | +Pipeline per job: download raw tab files from gs://policyengine-uk-microdata/ukds/ |
| 4 | +→ run the Rust extraction → upload clean CSVs to gs://policyengine-uk-microdata/<dataset>/<year>/. |
| 5 | +
|
| 6 | +Assumes: |
| 7 | + - `gcloud storage` CLI is authenticated and can read/write the bucket. |
| 8 | + - `cargo` is on PATH and the workspace builds cleanly. |
| 9 | +
|
| 10 | +Usage: |
| 11 | + python scripts/rebuild_all.py # rebuild everything |
| 12 | + python scripts/rebuild_all.py --only lcfs # rebuild just LCFS years |
| 13 | + python scripts/rebuild_all.py --only frs --year 2023 |
| 14 | + python scripts/rebuild_all.py --only efrs # rebuild EFRS for all FRS years we have |
| 15 | + python scripts/rebuild_all.py --work-dir /tmp/pe # use a fixed working dir (cached) |
| 16 | + python scripts/rebuild_all.py --keep # keep the working dir after running |
| 17 | +""" |
| 18 | + |
| 19 | +from __future__ import annotations |
| 20 | + |
| 21 | +import argparse |
| 22 | +import os |
| 23 | +import shutil |
| 24 | +import subprocess |
| 25 | +import sys |
| 26 | +import tempfile |
| 27 | +from dataclasses import dataclass |
| 28 | +from pathlib import Path |
| 29 | + |
| 30 | +BUCKET = "gs://policyengine-uk-microdata" |
| 31 | +RAW_PREFIX = f"{BUCKET}/ukds" |
| 32 | +REPO_ROOT = Path(__file__).resolve().parent.parent |
| 33 | + |
| 34 | +# Extra search paths for gcloud/cargo that might not be on the default subprocess PATH. |
| 35 | +_EXTRA_PATHS = [ |
| 36 | + Path.home() / ".cargo" / "bin", |
| 37 | + Path.home() / "Downloads" / "google-cloud-sdk" / "bin", |
| 38 | + Path("/opt/homebrew/bin"), |
| 39 | + Path("/usr/local/bin"), |
| 40 | +] |
| 41 | +for _p in _EXTRA_PATHS: |
| 42 | + if _p.is_dir() and str(_p) not in os.environ.get("PATH", ""): |
| 43 | + os.environ["PATH"] = f"{_p}:{os.environ.get('PATH', '')}" |
| 44 | + |
| 45 | + |
| 46 | +def _require(tool: str) -> None: |
| 47 | + if shutil.which(tool) is None: |
| 48 | + raise SystemExit( |
| 49 | + f"{tool!r} not found on PATH. Install it or add it to PATH before running." |
| 50 | + ) |
| 51 | + |
| 52 | + |
| 53 | +@dataclass |
| 54 | +class ExtractJob: |
| 55 | + """One raw survey → clean CSV extraction.""" |
| 56 | + dataset: str # frs | lcfs | spi | was |
| 57 | + year: int # target fiscal year for the clean output directory |
| 58 | + raw_ref: str # path under ukds/ (e.g. "frs/2023", "was/round_7") |
| 59 | + rust_flag: str # --frs | --lcfs | --spi | --was |
| 60 | + |
| 61 | + |
| 62 | +# Manifest of everything we can rebuild. Extend as new raw years arrive on the bucket. |
| 63 | +JOBS: list[ExtractJob] = [ |
| 64 | + ExtractJob("frs", 2022, "frs/2022", "--frs"), |
| 65 | + ExtractJob("frs", 2023, "frs/2023", "--frs"), |
| 66 | + ExtractJob("lcfs", 2019, "lcfs/2019", "--lcfs"), |
| 67 | + ExtractJob("lcfs", 2021, "lcfs/2021", "--lcfs"), |
| 68 | + ExtractJob("lcfs", 2022, "lcfs/2022", "--lcfs"), |
| 69 | + ExtractJob("spi", 2021, "spi/2021", "--spi"), |
| 70 | + ExtractJob("spi", 2022, "spi/2022", "--spi"), |
| 71 | + ExtractJob("was", 2020, "was/round_7", "--was"), |
| 72 | + ExtractJob("was", 2022, "was/round_8", "--was"), |
| 73 | +] |
| 74 | + |
| 75 | +# EFRS pipeline: (fiscal_year, frs_year, was_ref, lcfs_ref) |
| 76 | +# Picks the raw references it composes from. |
| 77 | +EFRS_JOBS: list[tuple[int, int, str, str]] = [ |
| 78 | + (2023, 2023, "was/round_7", "lcfs/2021"), |
| 79 | +] |
| 80 | + |
| 81 | + |
| 82 | +def run(cmd: list, cwd: Path | None = None) -> None: |
| 83 | + print(f" $ {' '.join(str(c) for c in cmd)}", flush=True) |
| 84 | + subprocess.run([str(c) for c in cmd], cwd=cwd, check=True) |
| 85 | + |
| 86 | + |
| 87 | +def gcs_copy_in(ref: str, dest: Path) -> None: |
| 88 | + """Download everything under ukds/<ref>/ into dest/.""" |
| 89 | + dest.mkdir(parents=True, exist_ok=True) |
| 90 | + # gcloud storage cp -r copies the listed objects verbatim. |
| 91 | + run(["gcloud", "storage", "cp", "-r", f"{RAW_PREFIX}/{ref}/*", str(dest)]) |
| 92 | + |
| 93 | + |
| 94 | +def gcs_copy_out(local_dir: Path, dataset: str, year: int) -> None: |
| 95 | + dest = f"{BUCKET}/{dataset}/{year}/" |
| 96 | + # Upload the three clean CSVs only; ignore any stray files. |
| 97 | + files = sorted(local_dir.glob("*.csv")) |
| 98 | + if not files: |
| 99 | + raise SystemExit(f"No CSV files in {local_dir}; extraction probably failed") |
| 100 | + run(["gcloud", "storage", "cp", *[str(f) for f in files], dest]) |
| 101 | + |
| 102 | + |
| 103 | +def ensure_raw(ref: str, work: Path) -> Path: |
| 104 | + """Download raw ukds/<ref> to work/raw/<ref>, caching if already present.""" |
| 105 | + raw_dir = work / "raw" / ref |
| 106 | + if raw_dir.is_dir() and any(raw_dir.iterdir()): |
| 107 | + print(f" (cached) {raw_dir}") |
| 108 | + return raw_dir |
| 109 | + gcs_copy_in(ref, raw_dir) |
| 110 | + return raw_dir |
| 111 | + |
| 112 | + |
| 113 | +def extract_one(job: ExtractJob, work: Path) -> Path: |
| 114 | + print(f"\n=== {job.dataset.upper()} {job.year} ===") |
| 115 | + raw_dir = ensure_raw(job.raw_ref, work) |
| 116 | + clean_dir = work / "clean" / job.dataset / str(job.year) |
| 117 | + clean_dir.mkdir(parents=True, exist_ok=True) |
| 118 | + run( |
| 119 | + [ |
| 120 | + "cargo", "run", "--release", "--quiet", "--", |
| 121 | + job.rust_flag, str(raw_dir), |
| 122 | + "--year", str(job.year), |
| 123 | + "--extract", str(clean_dir), |
| 124 | + ], |
| 125 | + cwd=REPO_ROOT, |
| 126 | + ) |
| 127 | + gcs_copy_out(clean_dir, job.dataset, job.year) |
| 128 | + return clean_dir |
| 129 | + |
| 130 | + |
| 131 | +def extract_efrs(fiscal_year: int, frs_year: int, was_ref: str, lcfs_ref: str, work: Path) -> None: |
| 132 | + print(f"\n=== EFRS {fiscal_year} (from FRS {frs_year}, {was_ref}, {lcfs_ref}) ===") |
| 133 | + |
| 134 | + # Need clean FRS as the base: if we already extracted it in this run it's on disk; |
| 135 | + # otherwise download the clean files from the bucket into work/clean/frs/<year>/. |
| 136 | + frs_clean = work / "clean" / "frs" / str(frs_year) |
| 137 | + if not frs_clean.is_dir() or not (frs_clean / "households.csv").exists(): |
| 138 | + frs_clean.mkdir(parents=True, exist_ok=True) |
| 139 | + run([ |
| 140 | + "gcloud", "storage", "cp", |
| 141 | + f"{BUCKET}/frs/{frs_year}/persons.csv", |
| 142 | + f"{BUCKET}/frs/{frs_year}/benunits.csv", |
| 143 | + f"{BUCKET}/frs/{frs_year}/households.csv", |
| 144 | + str(frs_clean) + "/", |
| 145 | + ]) |
| 146 | + |
| 147 | + frs_base = work / "clean" / "frs" # parent dir with YYYY/ subdirs |
| 148 | + was_raw = ensure_raw(was_ref, work) |
| 149 | + lcfs_raw = ensure_raw(lcfs_ref, work) |
| 150 | + |
| 151 | + efrs_out = work / "clean" / "efrs" / str(fiscal_year) |
| 152 | + efrs_out.mkdir(parents=True, exist_ok=True) |
| 153 | + run( |
| 154 | + [ |
| 155 | + "cargo", "run", "--release", "--quiet", "--", |
| 156 | + "--extract-efrs", str(efrs_out), |
| 157 | + "--data", str(frs_base), |
| 158 | + "--year", str(fiscal_year), |
| 159 | + "--was-dir", str(was_raw), |
| 160 | + "--lcfs-dir", str(lcfs_raw), |
| 161 | + ], |
| 162 | + cwd=REPO_ROOT, |
| 163 | + ) |
| 164 | + gcs_copy_out(efrs_out, "efrs", fiscal_year) |
| 165 | + |
| 166 | + |
| 167 | +def main() -> None: |
| 168 | + parser = argparse.ArgumentParser(description=__doc__ or "") |
| 169 | + parser.add_argument( |
| 170 | + "--only", |
| 171 | + choices=["frs", "lcfs", "spi", "was", "efrs"], |
| 172 | + help="Only rebuild one dataset family", |
| 173 | + ) |
| 174 | + parser.add_argument("--year", type=int, help="Only rebuild this fiscal year") |
| 175 | + parser.add_argument( |
| 176 | + "--work-dir", |
| 177 | + type=Path, |
| 178 | + help="Use this directory instead of a temp dir (enables caching)", |
| 179 | + ) |
| 180 | + parser.add_argument( |
| 181 | + "--keep", |
| 182 | + action="store_true", |
| 183 | + help="Keep the working dir after running (ignored with --work-dir)", |
| 184 | + ) |
| 185 | + args = parser.parse_args() |
| 186 | + |
| 187 | + _require("gcloud") |
| 188 | + _require("cargo") |
| 189 | + |
| 190 | + if args.work_dir: |
| 191 | + work = args.work_dir.resolve() |
| 192 | + work.mkdir(parents=True, exist_ok=True) |
| 193 | + cleanup = False |
| 194 | + else: |
| 195 | + work = Path(tempfile.mkdtemp(prefix="pe-uk-rebuild-")) |
| 196 | + cleanup = not args.keep |
| 197 | + print(f"Working directory: {work}") |
| 198 | + |
| 199 | + selected_jobs = JOBS |
| 200 | + if args.only and args.only != "efrs": |
| 201 | + selected_jobs = [j for j in JOBS if j.dataset == args.only] |
| 202 | + if args.year is not None: |
| 203 | + selected_jobs = [j for j in selected_jobs if j.year == args.year] |
| 204 | + |
| 205 | + run_efrs = args.only in (None, "efrs") |
| 206 | + |
| 207 | + try: |
| 208 | + if args.only != "efrs": |
| 209 | + for job in selected_jobs: |
| 210 | + extract_one(job, work) |
| 211 | + |
| 212 | + if run_efrs: |
| 213 | + for fiscal_year, frs_year, was_ref, lcfs_ref in EFRS_JOBS: |
| 214 | + if args.year is not None and fiscal_year != args.year: |
| 215 | + continue |
| 216 | + extract_efrs(fiscal_year, frs_year, was_ref, lcfs_ref, work) |
| 217 | + finally: |
| 218 | + if cleanup: |
| 219 | + shutil.rmtree(work, ignore_errors=True) |
| 220 | + |
| 221 | + print("\nAll done.") |
| 222 | + |
| 223 | + |
| 224 | +if __name__ == "__main__": |
| 225 | + sys.exit(main()) |
0 commit comments