diff --git a/scripts/build_targets/dwp.py b/scripts/build_targets/dwp.py index 5f6f1eb..0cb4840 100644 --- a/scripts/build_targets/dwp.py +++ b/scripts/build_targets/dwp.py @@ -375,6 +375,81 @@ def _fetch_uc_breakdowns() -> list[dict]: except Exception as e: logger.warning("Failed to fetch UC housing breakdown: %s", e) + # UC households by monthly payment band — constrains the UC amount distribution + try: + result = _query_table( + _UC_HH_DB, + [_UC_HH_COUNT], + [[f"{_UC_HH_FIELD}:hnpayment_band"]], + ) + year = _extract_year(result) + pairs = _extract_breakdown(result) + + # Consolidate into wider bands (monthly £ → annual £ for filter ranges). + # Stat-xplore bands are £100-wide from £0 to £2500+. We group into ~£300-400 + # bands to keep the target count reasonable while constraining the distribution. + _BAND_GROUPS = [ + ("0_to_300", 0, 300), + ("300_to_600", 300, 600), + ("600_to_900", 600, 900), + ("900_to_1200", 900, 1200), + ("1200_to_1500", 1200, 1500), + ("1500_to_2000", 1500, 2000), + ("2000_plus", 2000, 999999), + ] + + # Parse stat-xplore band labels into (lower_monthly, upper_monthly, count) + parsed_bands = [] + for label, count in pairs: + low_label = label.lower().strip() + if "no payment" in low_label: + parsed_bands.append((0, 0, count)) + elif "or over" in low_label: + # e.g. "£2500.01 or over" + val = float(low_label.split("£")[1].split(" ")[0]) + parsed_bands.append((val, 999999, count)) + elif " to " in low_label: + parts = low_label.replace("£", "").replace(",", "").split(" to ") + lo = float(parts[0]) + hi = float(parts[1]) + parsed_bands.append((lo, hi, count)) + + # Aggregate into grouped bands + for group_name, group_lo, group_hi in _BAND_GROUPS: + group_count = 0.0 + for lo, hi, count in parsed_bands: + if lo == 0 and hi == 0: + continue # skip "no payment" + band_mid = (lo + min(hi, 5000)) / 2.0 + if group_lo <= band_mid < group_hi: + group_count += count + + if group_count > 0: + # Filter range: convert monthly band to annual + annual_lo = group_lo * 12.0 + annual_hi = group_hi * 12.0 + + targets.append( + { + "name": f"dwp/uc_payment_band_{group_name}", + "variable": "universal_credit", + "entity": "benunit", + "aggregation": "count_nonzero", + "filter": { + "variable": "universal_credit", + "min": annual_lo, + "max": annual_hi, + }, + "value": group_count, + "source": "dwp", + "year": year, + "holdout": False, + } + ) + + except Exception as e: + logger.warning("Failed to fetch UC payment band breakdown: %s", e) + return targets @@ -384,10 +459,10 @@ def _fetch_uc_breakdowns() -> list[dict]: ) DWP_FORECAST_FILE = CACHE_DIR / "dwp_spring_statement_2025.xlsx" -CALIBRATION_YEARS = range(2024, 2030) # 2024/25 through 2029/30 +CALIBRATION_YEARS = range(2023, 2030) # 2023/24 through 2029/30 # Column 80 = 2024/25, ..., 85 = 2029/30 in the DWP forecast xlsx -_FORECAST_COL_TO_YEAR = {80: 2024, 81: 2025, 82: 2026, 83: 2027, 84: 2028, 85: 2029} +_FORECAST_COL_TO_YEAR = {79: 2023, 80: 2024, 81: 2025, 82: 2026, 83: 2027, 84: 2028, 85: 2029} def _download_forecast() -> Path: @@ -530,6 +605,20 @@ def _scale_targets_to_years( "dwp/uc_households_single_with_children": "universal_credit", "dwp/uc_households_couple_no_children": "universal_credit", "dwp/uc_households_couple_with_children": "universal_credit", + # Payment band breakdowns scale with total UC + "dwp/uc_payment_band_0_to_300": "universal_credit", + "dwp/uc_payment_band_300_to_600": "universal_credit", + "dwp/uc_payment_band_600_to_900": "universal_credit", + "dwp/uc_payment_band_900_to_1200": "universal_credit", + "dwp/uc_payment_band_1200_to_1500": "universal_credit", + "dwp/uc_payment_band_1500_to_2000": "universal_credit", + "dwp/uc_payment_band_2000_plus": "universal_credit", + # Age band breakdowns scale with total UC + "dwp/uc_age_16_24": "universal_credit", + "dwp/uc_age_25_34": "universal_credit", + "dwp/uc_age_35_49": "universal_credit", + "dwp/uc_age_50_64": "universal_credit", + "dwp/uc_age_65_plus": "universal_credit", } scaled: list[dict] = [] diff --git a/scripts/build_targets/hmrc.py b/scripts/build_targets/hmrc.py index a9986c8..549e51f 100644 --- a/scripts/build_targets/hmrc.py +++ b/scripts/build_targets/hmrc.py @@ -28,7 +28,7 @@ # HMRC SPI 2022-23 collated tables (ODS) SPI_URL = "https://assets.publishing.service.gov.uk/media/67cabb37ade26736dbf9ffe5/Collated_Tables_3_1_to_3_17_2223.ods" SPI_YEAR = 2022 # FY 2022-23 → base year for growth indexing -CALIBRATION_YEARS = range(2024, 2031) +CALIBRATION_YEARS = range(2023, 2031) INCOME_BANDS_LOWER = [ 12_570, diff --git a/scripts/build_targets/obr.py b/scripts/build_targets/obr.py index 89a3621..5da95f6 100644 --- a/scripts/build_targets/obr.py +++ b/scripts/build_targets/obr.py @@ -319,29 +319,30 @@ def _parse_welfare() -> list[dict]: ("State pension", "obr/state_pension", "state_pension", "benunit"), ] - # UC appears twice in 4.9 — inside and outside the welfare cap. We want both. - uc_rows_found = 0 + # UC appears twice in 4.9 — inside and outside the welfare cap. Sum them + # into a single total UC spend target since our simulation doesn't + # distinguish the two components. + uc_by_year: dict[int, float] = {} for row_num in range(6, 50): val = ws[f"B{row_num}"].value if val and str(val).strip().startswith("Universal credit"): - uc_rows_found += 1 - suffix = "in_cap" if uc_rows_found == 1 else "outside_cap" values = _read_row(ws, row_num, _WELFARE_COL_TO_YEAR) for year, value in values.items(): - targets.append( - { - "name": f"obr/universal_credit_{suffix}/{year}", - "variable": "universal_credit", - "entity": "benunit", - "aggregation": "sum", - "filter": None, - "value": value, - "source": "obr", - "year": year, - "holdout": suffix - == "outside_cap", # Only use one UC total for training - } - ) + uc_by_year[year] = uc_by_year.get(year, 0.0) + value + for year, value in uc_by_year.items(): + targets.append( + { + "name": f"obr/universal_credit_total/{year}", + "variable": "universal_credit", + "entity": "benunit", + "aggregation": "sum", + "filter": None, + "value": value, + "source": "obr", + "year": year, + "holdout": False, + } + ) for label, name, variable, entity in benefit_rows: row = _find_row(ws, label) @@ -534,6 +535,61 @@ def _parse_economy() -> list[dict]: return targets +def _backfill_2023(targets: list[dict]) -> list[dict]: + """Back-extrapolate 2023 targets from 2024 outturn. + + The March 2026 EFO's earliest column is 2024/25 outturn. For 2023/24 we + scale backwards using OBR growth rates: earnings growth for tax receipts, + CPI for benefit spending, council tax growth for council tax. + """ + # OBR growth rates for the 2023→2024 transition (from economy tables) + EARNINGS_GROWTH_2024 = 0.0493 + CPI_GROWTH_2024 = 0.0253 + CT_GROWTH_2024 = 0.051 + + # Which growth factor to use for each target prefix + _DEFLATOR = { + "obr/income_tax": EARNINGS_GROWTH_2024, + "obr/ni_": EARNINGS_GROWTH_2024, + "obr/vat_": EARNINGS_GROWTH_2024, + "obr/fuel_duty": EARNINGS_GROWTH_2024, + "obr/cgt_": EARNINGS_GROWTH_2024, + "obr/sdlt_": EARNINGS_GROWTH_2024, + "obr/council_tax": CT_GROWTH_2024, + "obr/housing_benefit": CPI_GROWTH_2024, + "obr/pip_dla": CPI_GROWTH_2024, + "obr/attendance_allowance": CPI_GROWTH_2024, + "obr/pension_credit": CPI_GROWTH_2024, + "obr/carers_allowance": CPI_GROWTH_2024, + "obr/child_benefit": CPI_GROWTH_2024, + "obr/state_pension": CPI_GROWTH_2024, + "obr/universal_credit": CPI_GROWTH_2024, + } + + existing_2023 = {t["name"] for t in targets if t["year"] == 2023} + extra = [] + for t in targets: + if t["year"] != 2024 or t["source"] != "obr": + continue + name_2023 = t["name"].replace("/2024", "/2023") + if name_2023 in existing_2023: + continue + # Find the right deflator + growth = None + for prefix, rate in _DEFLATOR.items(): + if t["name"].startswith(prefix): + growth = rate + break + if growth is None: + continue + t2 = dict(t) + t2["name"] = name_2023 + t2["year"] = 2023 + t2["value"] = t["value"] / (1 + growth) + extra.append(t2) + return targets + extra + + def get_targets() -> list[dict]: targets = [] if RECEIPTS_FILE.exists(): @@ -544,4 +600,5 @@ def get_targets() -> list[dict]: targets.extend(_parse_council_tax()) if ECONOMY_FILE.exists(): targets.extend(_parse_economy()) + targets = _backfill_2023(targets) return targets diff --git a/scripts/build_targets/ons.py b/scripts/build_targets/ons.py index 39462ef..6091484 100644 --- a/scripts/build_targets/ons.py +++ b/scripts/build_targets/ons.py @@ -1,11 +1,13 @@ """ONS demographic calibration targets. -Population by age group, total households, and regional distribution. -These are from ONS mid-year population estimates and household projections. +Population by age group, total households, tenure distribution, and regional +population. From ONS mid-year population estimates, household projections, +and English Housing Survey / census tenure breakdowns. Sources: - ONS mid-year population estimates 2023 - ONS household projections +- English Housing Survey / Census 2021 tenure distribution (UK-adjusted) """ from __future__ import annotations @@ -40,6 +42,34 @@ "northern_ireland": 1_900_000, } +# Household tenure distribution (UK, ~2023). +# Source: EHS 2022-23 headline report + census 2021 proportions for DA adjustment. +# tenure_type RF codes: 0=OwnedOutright, 1=OwnedWithMortgage, 2=RentFromCouncil, +# 3=RentFromHA, 4=RentPrivately, 5=Other. +# We combine social rent (council + HA) and use 3 broad categories. +_TENURE_HOUSEHOLDS = { + "owned_outright": (0, 0, 8_800_000), # ~31% + "owned_mortgage": (1, 1, 6_600_000), # ~23% + "social_rent": (2, 3, 4_700_000), # ~17% (council + HA) + "private_rent": (4, 4, 4_900_000), # ~17% +} + +# Region RF codes matching the Rust enum. +_REGION_RF_CODE = { + "north_east": 0, + "north_west": 1, + "yorkshire": 2, + "east_midlands": 3, + "west_midlands": 4, + "east_of_england": 5, + "london": 6, + "south_east": 7, + "south_west": 8, + "wales": 9, + "scotland": 10, + "northern_ireland": 11, +} + def get_targets() -> list[dict]: """Generate ONS demographic targets for all calibration years. @@ -51,7 +81,7 @@ def get_targets() -> list[dict]: targets = [] # Emit for all plausible calibration years - for year in range(2024, 2031): + for year in range(2023, 2031): # Age group population counts for group, count in _POPULATION.items(): if group == "total": @@ -107,4 +137,49 @@ def get_targets() -> list[dict]: } ) + # Households by tenure + for tenure_name, (code_lo, code_hi, count) in _TENURE_HOUSEHOLDS.items(): + targets.append( + { + "name": f"ons/tenure_{tenure_name}/{year}", + "variable": "household_id", + "entity": "household", + "aggregation": "count", + "filter": { + "variable": "tenure_type", + "min": float(code_lo), + "max": float(code_hi) + 1.0, # exclusive upper bound + }, + "value": float(count), + "source": "ons", + "year": year, + "holdout": False, + } + ) + + # Households by region + for region_name, code in _REGION_RF_CODE.items(): + pop = _REGIONAL_POPULATION.get(region_name, 0) + if pop == 0: + continue + # Approximate households from population using national ratio + hh_count = pop * _TOTAL_HOUSEHOLDS / _POPULATION["total"] + targets.append( + { + "name": f"ons/region_{region_name}/{year}", + "variable": "household_id", + "entity": "household", + "aggregation": "count", + "filter": { + "variable": "region", + "min": float(code), + "max": float(code) + 1.0, + }, + "value": round(hh_count), + "source": "ons", + "year": year, + "holdout": True, # holdout — approximate conversion + } + ) + return targets diff --git a/scripts/pool_frs.py b/scripts/pool_frs.py new file mode 100644 index 0000000..06b2ab4 --- /dev/null +++ b/scripts/pool_frs.py @@ -0,0 +1,147 @@ +"""Pool multiple FRS survey years into a single dataset for EFRS construction. + +Concatenates persons, benunits, and households CSVs from multiple years, +reindexing all IDs to avoid collisions and dividing weights by the number +of years pooled (since each year independently represents the full population). + +Usage: + python scripts/pool_frs.py --years 2021 2022 2023 --data-dir ~/.policyengine-uk-data/frs --output data/frs_pooled/2023 +""" + +from __future__ import annotations + +import argparse +import csv +from pathlib import Path + + +def pool(years: list[int], data_dir: Path, output_dir: Path) -> None: + output_dir.mkdir(parents=True, exist_ok=True) + n_years = len(years) + + # First pass: determine ID offsets per year + offsets = [] # (person_offset, benunit_offset, household_offset) + p_off, b_off, h_off = 0, 0, 0 + for year in years: + year_dir = data_dir / str(year) + with open(year_dir / "persons.csv") as f: + n_persons = sum(1 for _ in f) - 1 + with open(year_dir / "benunits.csv") as f: + n_benunits = sum(1 for _ in f) - 1 + with open(year_dir / "households.csv") as f: + n_households = sum(1 for _ in f) - 1 + offsets.append((p_off, b_off, h_off)) + p_off += n_persons + b_off += n_benunits + h_off += n_households + + # Pool persons — use union of all columns across years + all_persons = [] + all_person_fields: list[str] = [] + for year, (po, _, _) in zip(years, offsets): + with open(data_dir / str(year) / "persons.csv") as f: + reader = csv.DictReader(f) + for col in reader.fieldnames: + if col not in all_person_fields: + all_person_fields.append(col) + for row in reader: + row["person_id"] = str(int(row["person_id"]) + po) + all_persons.append(row) + + with open(output_dir / "persons.csv", "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=all_person_fields, restval="0") + writer.writeheader() + writer.writerows(all_persons) + print(f" Pooled {len(all_persons):,} persons") + + # Pool benunits — harmonise column names across FRS format changes. + # FRS 2021/2022 uses: migration_seed, would_claim_uc, would_claim_cb, ... + # FRS 2023 uses: take_up_seed, reported_uc, reported_cb, ..., is_enr_uc, ... + # The EFRS writer and loader expect the would_claim_* format. + _BU_RENAME = { + "take_up_seed": "migration_seed", + "reported_uc": "would_claim_uc", + "reported_cb": "would_claim_cb", + "reported_hb": "would_claim_hb", + "reported_pc": "would_claim_pc", + "reported_ctc": "would_claim_ctc", + "reported_wtc": "would_claim_wtc", + "reported_is": "would_claim_is", + } + # Columns to drop (is_enr_* are not used in the would_claim format) + _BU_DROP = { + "is_enr_uc", "is_enr_hb", "is_enr_pc", "is_enr_cb", + "is_enr_ctc", "is_enr_wtc", "is_enr_is", "is_enr_esa", "is_enr_jsa", + } + + all_benunits = [] + all_bu_fields: list[str] = [] + for year, (po, bo, ho) in zip(years, offsets): + with open(data_dir / str(year) / "benunits.csv") as f: + reader = csv.DictReader(f) + for col in reader.fieldnames: + mapped = _BU_RENAME.get(col, col) + if mapped not in _BU_DROP and mapped not in all_bu_fields: + all_bu_fields.append(mapped) + for row in reader: + # Rename columns + renamed = {} + for k, v in row.items(): + mapped = _BU_RENAME.get(k, k) + if mapped not in _BU_DROP: + renamed[mapped] = v + renamed["benunit_id"] = str(int(renamed["benunit_id"]) + bo) + renamed["household_id"] = str(int(renamed["household_id"]) + ho) + pids = renamed["person_ids"].split(";") + renamed["person_ids"] = ";".join(str(int(p) + po) for p in pids) + all_benunits.append(renamed) + + with open(output_dir / "benunits.csv", "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=all_bu_fields, restval="false") + writer.writeheader() + writer.writerows(all_benunits) + print(f" Pooled {len(all_benunits):,} benunits") + + # Pool households + all_households = [] + all_hh_fields: list[str] = [] + for year, (po, bo, ho) in zip(years, offsets): + with open(data_dir / str(year) / "households.csv") as f: + reader = csv.DictReader(f) + for col in reader.fieldnames: + if col not in all_hh_fields: + all_hh_fields.append(col) + for row in reader: + row["household_id"] = str(int(row["household_id"]) + ho) + bids = row["benunit_ids"].split(";") + row["benunit_ids"] = ";".join(str(int(b) + bo) for b in bids) + pids = row["person_ids"].split(";") + row["person_ids"] = ";".join(str(int(p) + po) for p in pids) + row["weight"] = str(float(row["weight"]) / n_years) + all_households.append(row) + + with open(output_dir / "households.csv", "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=all_hh_fields, restval="0") + writer.writeheader() + writer.writerows(all_households) + print(f" Pooled {len(all_households):,} households (weights / {n_years})") + + total_w = sum(float(r["weight"]) for r in all_households) + n_uc = sum(1 for b in all_benunits if b.get("on_uc") == "true") + print(f" Total weight: {total_w:,.0f}, UC benunits: {n_uc:,}") + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__ or "") + parser.add_argument("--years", type=int, nargs="+", required=True) + parser.add_argument("--data-dir", type=Path, required=True) + parser.add_argument("--output", type=Path, required=True) + args = parser.parse_args() + + print(f"Pooling FRS years {args.years} from {args.data_dir}") + pool(args.years, args.data_dir, args.output) + print("Done.") + + +if __name__ == "__main__": + main() diff --git a/src/data/calibrate.rs b/src/data/calibrate.rs index 90ae3db..42975c1 100644 --- a/src/data/calibrate.rs +++ b/src/data/calibrate.rs @@ -204,6 +204,8 @@ fn household_variable( "net_financial_wealth" => h.net_financial_wealth, "gross_financial_wealth" => h.gross_financial_wealth, "savings" => h.savings, + "tenure_type" => h.tenure_type.to_rf_code(), + "region" => h.region.to_rf_code(), _ => 0.0, } } @@ -355,6 +357,21 @@ pub fn build_matrix( .sum::() }; + // Apply min/max range filter on the benunit variable value + if let Some(ref filter) = target.filter { + let filter_val = if filter.variable == target.variable { + bu_val + } else if let Some(results) = sim_results { + benunit_result_variable(&results.benunit_results[bu_id], &filter.variable) + .unwrap_or(0.0) + } else { + 0.0 + }; + if filter_val < filter.min || filter_val >= filter.max { + continue; + } + } + match target.aggregation.as_str() { "sum" => { contribution += bu_val; @@ -375,6 +392,13 @@ pub fn build_matrix( } "household" => { for (i, hh) in dataset.households.iter().enumerate() { + // Apply filter if present + if let Some(ref filter) = target.filter { + let filter_val = household_variable(hh, sim_results, i, &filter.variable); + if filter_val < filter.min || filter_val >= filter.max { + continue; + } + } match target.aggregation.as_str() { "sum" => { matrix[i][j] = household_variable(hh, sim_results, i, &target.variable); @@ -418,6 +442,9 @@ pub struct CalibrateConfig { pub eps: f64, pub dropout: f64, pub log_interval: usize, + /// Maximum ratio of calibrated weight to initial weight (e.g. 100.0 means + /// no household can exceed 100x its original weight). Set to 0 to disable. + pub max_weight_ratio: f64, } impl Default for CalibrateConfig { @@ -430,6 +457,7 @@ impl Default for CalibrateConfig { eps: 1e-8, dropout: 0.05, log_interval: 50, + max_weight_ratio: 100.0, } } } @@ -467,6 +495,22 @@ pub fn calibrate( .map(|&w| if w > 0.0 { w.ln() } else { 0.0 }) .collect(); + // Compute log-space bounds for weight clamping + let u_max: Vec = if config.max_weight_ratio > 0.0 { + initial_weights.iter() + .map(|&w| if w > 0.0 { (w * config.max_weight_ratio).ln() } else { (config.max_weight_ratio).ln() }) + .collect() + } else { + vec![f64::INFINITY; n_hh] + }; + let u_min: Vec = if config.max_weight_ratio > 0.0 { + initial_weights.iter() + .map(|&w| if w > 0.0 { (w / config.max_weight_ratio).ln() } else { -(config.max_weight_ratio).ln() }) + .collect() + } else { + vec![f64::NEG_INFINITY; n_hh] + }; + let mut m = vec![0.0f64; n_hh]; let mut v = vec![0.0f64; n_hh]; @@ -582,6 +626,8 @@ pub fn calibrate( let m_hat = m[i] / bc1; let v_hat = v[i] / bc2; u[i] -= config.lr * m_hat / (v_hat.sqrt() + config.eps); + // Clamp to max weight ratio bounds + u[i] = u[i].clamp(u_min[i], u_max[i]); } }