diff --git a/scripts/build_targets/dwp.py b/scripts/build_targets/dwp.py index 05c3a58..5f6f1eb 100644 --- a/scripts/build_targets/dwp.py +++ b/scripts/build_targets/dwp.py @@ -1,9 +1,13 @@ -"""Fetch DWP benefit statistics from the Stat-Xplore API. +"""Fetch DWP benefit statistics from the Stat-Xplore API and forecasts. Queries caseloads for UC (with subgroup breakdowns), PIP, pension credit, carer's allowance, attendance allowance, state pension, ESA, and DLA. Results are cached locally to avoid repeated API calls. +The Stat-Xplore snapshot (latest month) is then scaled to all calibration +years (2024-2029) using DWP's own caseload forecasts from the Spring +Statement 2025 benefit expenditure and caseload tables. + Requires STAT_XPLORE_API_KEY environment variable to be set. See: https://stat-xplore.dwp.gov.uk/webapi/online-help/Open-Data-API.html """ @@ -15,6 +19,7 @@ import os from pathlib import Path +import openpyxl import requests logger = logging.getLogger(__name__) @@ -373,23 +378,206 @@ def _fetch_uc_breakdowns() -> list[dict]: return targets +DWP_FORECAST_URL = ( + "https://assets.publishing.service.gov.uk/media/68f8923724fc2bb7eed11ac8/" + "outturn-and-forecast-tables-spring-statement-2025.xlsx" +) +DWP_FORECAST_FILE = CACHE_DIR / "dwp_spring_statement_2025.xlsx" + +CALIBRATION_YEARS = range(2024, 2030) # 2024/25 through 2029/30 + +# Column 80 = 2024/25, ..., 85 = 2029/30 in the DWP forecast xlsx +_FORECAST_COL_TO_YEAR = {80: 2024, 81: 2025, 82: 2026, 83: 2027, 84: 2028, 85: 2029} + + +def _download_forecast() -> Path: + """Download the DWP forecast xlsx if not cached.""" + if DWP_FORECAST_FILE.exists(): + return DWP_FORECAST_FILE + logger.info("Downloading DWP forecast tables...") + r = requests.get(DWP_FORECAST_URL, timeout=60, allow_redirects=True) + r.raise_for_status() + CACHE_DIR.mkdir(parents=True, exist_ok=True) + DWP_FORECAST_FILE.write_bytes(r.content) + return DWP_FORECAST_FILE + + +def _find_forecast_row(ws, label: str, start_row: int = 1, max_row: int = 200) -> int | None: + """Find the first row in column B starting with label.""" + for row in range(start_row, max_row + 1): + val = ws.cell(row=row, column=2).value + if val and str(val).strip().startswith(label): + return row + return None + + +def _read_forecast_row(ws, row: int) -> dict[int, float]: + """Read caseload values (thousands) from a forecast row.""" + result = {} + for col, year in _FORECAST_COL_TO_YEAR.items(): + val = ws.cell(row=row, column=col).value + if val is not None and isinstance(val, (int, float)): + result[year] = float(val) * 1e3 # thousands → people + return result + + +def _parse_caseload_forecasts() -> dict[str, dict[int, float]]: + """Parse DWP forecast xlsx for benefit caseload projections. + + Returns {benefit_key: {year: caseload}} for each benefit. + """ + try: + path = _download_forecast() + except Exception as e: + logger.warning("Failed to download DWP forecast: %s", e) + return {} + + wb = openpyxl.load_workbook(path, data_only=True) + forecasts: dict[str, dict[int, float]] = {} + + # UC caseloads from "Universal Credit and equivalent" sheet + ws = wb["Universal Credit and equivalent"] + uc_row = _find_forecast_row(ws, "Universal Credit", start_row=48) + if uc_row: + forecasts["universal_credit"] = _read_forecast_row(ws, uc_row) + + uc_carer_row = _find_forecast_row(ws, "Universal Credit Carers Element", start_row=48) + if uc_carer_row: + forecasts["uc_carer_element"] = _read_forecast_row(ws, uc_carer_row) + + uc_housing_row = _find_forecast_row(ws, "Universal Credit Housing Element", start_row=48) + if uc_housing_row: + forecasts["uc_housing_element"] = _read_forecast_row(ws, uc_housing_row) + + # LCWRA from health element breakdown + lcwra_row = _find_forecast_row(ws, "of which limited capability for work and work-related activi", start_row=48) + if lcwra_row: + forecasts["uc_lcwra"] = _read_forecast_row(ws, lcwra_row) + + lcw_row = _find_forecast_row(ws, "of which limited capability for work", start_row=48) + if lcw_row: + # Make sure we didn't pick up the LCWRA row + label = str(ws.cell(row=lcw_row, column=2).value).strip() + if "related" not in label: + forecasts["uc_lcw"] = _read_forecast_row(ws, lcw_row) + + esa_row = _find_forecast_row(ws, "Employment and Support Allowance", start_row=48) + if esa_row: + forecasts["esa"] = _read_forecast_row(ws, esa_row) + + # Disability benefits sheet + ws = wb["Disability benefits"] + pip_row = _find_forecast_row(ws, "Personal Independence Payment", start_row=50) + if pip_row: + forecasts["pip"] = _read_forecast_row(ws, pip_row) + + dla_row = _find_forecast_row(ws, "Disability Living Allowance", start_row=50) + if dla_row: + forecasts["dla"] = _read_forecast_row(ws, dla_row) + + aa_row = _find_forecast_row(ws, "Attendance Allowance", start_row=50) + if aa_row: + forecasts["attendance_allowance"] = _read_forecast_row(ws, aa_row) + + # Carer's Allowance sheet + ws = wb["Carers Allowance"] + ca_total_row = _find_forecast_row(ws, "Total", start_row=14) + if ca_total_row: + forecasts["carers_allowance"] = _read_forecast_row(ws, ca_total_row) + + # Pension Credit sheet + ws = wb["Pension Credit"] + pc_row = _find_forecast_row(ws, "Total Pension Credit", start_row=18) + if pc_row: + forecasts["pension_credit"] = _read_forecast_row(ws, pc_row) + + # State Pension sheet + ws = wb["State Pension"] + sp_row = _find_forecast_row(ws, "Total State Pension Caseload", start_row=28) + if sp_row: + forecasts["state_pension"] = _read_forecast_row(ws, sp_row) + + wb.close() + return forecasts + + +def _scale_targets_to_years( + base_targets: list[dict], + forecasts: dict[str, dict[int, float]], +) -> list[dict]: + """Scale stat-xplore snapshot targets to all calibration years using DWP forecasts. + + For each base target (from stat-xplore, typically 2025), compute a scaling + factor from the DWP forecast caseload trajectory and emit a target for each year. + """ + # Map target names to forecast keys for scaling + _FORECAST_KEY = { + "dwp/uc_total_claimants": "universal_credit", + "dwp/pip_total_claimants": "pip", + "dwp/pension_credit_claimants": "pension_credit", + "dwp/carers_allowance_claimants": "carers_allowance", + "dwp/attendance_allowance_claimants": "attendance_allowance", + "dwp/state_pension_claimants": "state_pension", + "dwp/esa_claimants": "esa", + "dwp/dla_claimants": "dla", + "dwp/uc_households_with_children": "universal_credit", + "dwp/uc_households_lcwra": "uc_lcwra", + "dwp/uc_households_lcw": "uc_lcw", + "dwp/uc_households_with_carer": "uc_carer_element", + "dwp/uc_households_with_housing": "uc_housing_element", + # Family type breakdowns scale with total UC + "dwp/uc_households_single_no_children": "universal_credit", + "dwp/uc_households_single_with_children": "universal_credit", + "dwp/uc_households_couple_no_children": "universal_credit", + "dwp/uc_households_couple_with_children": "universal_credit", + } + + scaled: list[dict] = [] + for target in base_targets: + base_year = target["year"] + forecast_key = _FORECAST_KEY.get(target["name"]) + forecast_series = forecasts.get(forecast_key, {}) if forecast_key else {} + base_forecast = forecast_series.get(base_year, 0) + + for year in CALIBRATION_YEARS: + year_forecast = forecast_series.get(year, 0) + if base_forecast > 0 and year_forecast > 0: + scale = year_forecast / base_forecast + else: + scale = 1.0 + + t = dict(target) + t["name"] = f"{target['name']}/{year}" + t["year"] = year + t["value"] = target["value"] * scale + scaled.append(t) + + return scaled + + def get_targets() -> list[dict]: if CACHE_FILE.exists(): logger.info("Using cached DWP targets: %s", CACHE_FILE) - return json.loads(CACHE_FILE.read_text()) - - if not API_KEY: + base_targets = json.loads(CACHE_FILE.read_text()) + elif API_KEY: + base_targets = [] + base_targets.extend(_fetch_simple_benefits()) + base_targets.extend(_fetch_uc_breakdowns()) + CACHE_DIR.mkdir(parents=True, exist_ok=True) + CACHE_FILE.write_text(json.dumps(base_targets, indent=2)) + logger.info("Cached %d DWP base targets to %s", len(base_targets), CACHE_FILE) + else: logger.warning( - "STAT_XPLORE_API_KEY not set — skipping DWP targets. " + "STAT_XPLORE_API_KEY not set and no cache — skipping DWP targets. " "Set the env var and re-run to fetch from stat-xplore." ) return [] - targets = [] - targets.extend(_fetch_simple_benefits()) - targets.extend(_fetch_uc_breakdowns()) + # Parse DWP caseload forecasts and scale base targets to all years + forecasts = _parse_caseload_forecasts() + if forecasts: + return _scale_targets_to_years(base_targets, forecasts) - CACHE_DIR.mkdir(parents=True, exist_ok=True) - CACHE_FILE.write_text(json.dumps(targets, indent=2)) - logger.info("Cached %d DWP targets to %s", len(targets), CACHE_FILE) - return targets + # Fallback: emit base targets as-is (single year only) + logger.warning("No DWP forecasts available — emitting base targets for single year only") + return base_targets diff --git a/scripts/build_targets/hmrc.py b/scripts/build_targets/hmrc.py index 8b5182d..a9986c8 100644 --- a/scripts/build_targets/hmrc.py +++ b/scripts/build_targets/hmrc.py @@ -4,6 +4,9 @@ income-by-band targets for employment, self-employment, pensions, property, dividends, and savings interest — both amounts and taxpayer counts per band. +The 2022-23 SPI snapshot is then scaled to all calibration years (2024-2030) +using OBR income growth indexes from sheets 3.5 and 1.6. + Source: https://www.gov.uk/government/statistics/income-tax-summarised-accounts-statistics """ @@ -24,7 +27,8 @@ # HMRC SPI 2022-23 collated tables (ODS) SPI_URL = "https://assets.publishing.service.gov.uk/media/67cabb37ade26736dbf9ffe5/Collated_Tables_3_1_to_3_17_2223.ods" -SPI_YEAR = 2023 # FY 2022-23 → calendar 2023 +SPI_YEAR = 2022 # FY 2022-23 → base year for growth indexing +CALIBRATION_YEARS = range(2024, 2031) INCOME_BANDS_LOWER = [ 12_570, @@ -139,11 +143,16 @@ def get_targets() -> list[dict]: logger.error("Failed to download HMRC SPI ODS: %s", e) return targets + # Get OBR growth indexes for scaling to future years + from build_targets import obr + + growth_indexes = obr.get_income_growth_indexes() + t36 = _parse_table_36(ods_bytes) t37 = _parse_table_37(ods_bytes) merged = t36.merge(t37, on="lower_bound", how="outer") - # Hold out count targets as validation (amounts used for training) + # Build base-year targets, then scale to all calibration years for idx, row in merged.iterrows(): lower = int(row["lower_bound"]) upper = INCOME_BANDS_UPPER[idx] if idx < len(INCOME_BANDS_UPPER) else 1e12 @@ -154,43 +163,59 @@ def get_targets() -> list[dict]: count_col = f"{variable}_count" if amount_col in row.index and row[amount_col] > 0: - # SPI amounts are in £millions - targets.append( - { - "name": f"hmrc/{variable}_amount_{band_label}", - "variable": variable, - "entity": "person", - "aggregation": "sum", - "filter": { - "variable": "total_income", - "min": float(lower), - "max": float(upper), - }, - "value": float(row[amount_col]) * 1e6, - "source": "hmrc_spi", - "year": SPI_YEAR, - "holdout": False, - } - ) + base_amount = float(row[amount_col]) * 1e6 # £millions → £ + var_index = growth_indexes.get(variable, {}) + + for year in CALIBRATION_YEARS: + # Scale amount by growth index relative to base year + scale = 1.0 + if var_index: + base_idx = var_index.get(SPI_YEAR, 1.0) + year_idx = var_index.get(year, base_idx) + scale = year_idx / base_idx if base_idx > 0 else 1.0 + scaled_amount = base_amount * scale + + targets.append( + { + "name": f"hmrc/{variable}_amount_{band_label}/{year}", + "variable": variable, + "entity": "person", + "aggregation": "sum", + "filter": { + "variable": "total_income", + "min": float(lower), + "max": float(upper), + }, + "value": scaled_amount, + "source": "hmrc_spi", + "year": year, + "holdout": False, + } + ) if count_col in row.index and row[count_col] > 0: - # SPI counts are in thousands — use as holdout validation - targets.append( - { - "name": f"hmrc/{variable}_count_{band_label}", - "variable": variable, - "entity": "person", - "aggregation": "count_nonzero", - "filter": { - "variable": "total_income", - "min": float(lower), - "max": float(upper), - }, - "value": float(row[count_col]) * 1e3, - "source": "hmrc_spi", - "year": SPI_YEAR, - "holdout": True, - } - ) + base_count = float(row[count_col]) * 1e3 # thousands → people + + for year in CALIBRATION_YEARS: + # Counts are held constant — income growth changes amounts + # not the number of taxpayers per band (the band boundaries + # are fixed in nominal terms) + targets.append( + { + "name": f"hmrc/{variable}_count_{band_label}/{year}", + "variable": variable, + "entity": "person", + "aggregation": "count_nonzero", + "filter": { + "variable": "total_income", + "min": float(lower), + "max": float(upper), + }, + "value": base_count, + "source": "hmrc_spi", + "year": year, + "holdout": True, + } + ) return targets diff --git a/scripts/build_targets/obr.py b/scripts/build_targets/obr.py index 065a2de..89a3621 100644 --- a/scripts/build_targets/obr.py +++ b/scripts/build_targets/obr.py @@ -3,6 +3,7 @@ Sources (local xlsx files in data/obr/): - Receipts: efo-march-2026-detailed-forecast-tables-receipts.xlsx - Expenditure: efo-march-2026-detailed-forecast-tables-expenditure.xlsx +- Economy: efo-march-2026-detailed-forecast-tables-economy.xlsx """ from __future__ import annotations @@ -16,6 +17,7 @@ RECEIPTS_FILE = OBR_DIR / "efo-march-2026-detailed-forecast-tables-receipts.xlsx" EXPENDITURE_FILE = OBR_DIR / "efo-march-2026-detailed-forecast-tables-expenditure.xlsx" +ECONOMY_FILE = OBR_DIR / "efo-march-2026-detailed-forecast-tables-economy.xlsx" # Sheet 3.8 (cash receipts): D=2024-25, E=2025-26, ..., J=2030-31 _RECEIPTS_COL_TO_YEAR = { @@ -54,6 +56,83 @@ } +def get_income_growth_indexes() -> dict[str, dict[int, float]]: + """Return cumulative growth indexes relative to 2023 for each income type. + + Uses OBR sheet 3.5 (self-employment, dividend, property, savings growth + rates) and sheet 1.6 (wages & salaries levels) to build indexes that can + scale the HMRC SPI 2022-23 snapshot to other years. + + Returns e.g. {"employment_income": {2023: 1.0, 2024: 1.07, ...}, ...} + """ + indexes: dict[str, dict[int, float]] = {} + + # ── Wages & salaries from sheet 1.6 (levels, £bn) ── + if ECONOMY_FILE.exists(): + wb = openpyxl.load_workbook(ECONOMY_FILE, data_only=True) + ws = wb["1.6"] + wage_levels: dict[int, float] = {} + for row in range(4, 200): + b = ws.cell(row=row, column=2).value + if b is None: + continue + year = _parse_fiscal_year(str(b)) + if year is not None and 2022 <= year <= 2030: + val = ws.cell(row=row, column=14).value # Col N = wages & salaries + if val is not None and isinstance(val, (int, float)): + wage_levels[year] = float(val) + wb.close() + if 2022 in wage_levels: + base = wage_levels[2022] + indexes["employment_income"] = {y: v / base for y, v in wage_levels.items()} + + # ── Growth rates from sheet 3.5 ── + # Cols: C=2023-24, D=2024-25, ..., J=2030-31 + _35_col_to_year = {3: 2023, 4: 2024, 5: 2025, 6: 2026, 7: 2027, 8: 2028, 9: 2029, 10: 2030} + _35_rows = { + "self_employment_income": 6, + "dividend_income": 7, + "property_income": 8, + "savings_interest": 9, + } + if RECEIPTS_FILE.exists(): + wb = openpyxl.load_workbook(RECEIPTS_FILE, data_only=True) + ws = wb["3.5"] + for variable, data_row in _35_rows.items(): + # Build cumulative index from growth rates (% p.a.) + # Base year is 2022 (FY 2022-23), so index[2022] = 1.0 + idx: dict[int, float] = {2022: 1.0} + for col, year in sorted(_35_col_to_year.items()): + rate = ws.cell(row=data_row, column=col).value + if rate is not None and isinstance(rate, (int, float)): + prev_year = year - 1 + idx[year] = idx.get(prev_year, 1.0) * (1 + rate / 100.0) + indexes[variable] = idx + wb.close() + + # State pension and private pension: use CPI as a proxy (triple lock ≈ max of CPI, AWE, 2.5%) + # For calibration purposes CPI is a reasonable approximation + if ECONOMY_FILE.exists(): + wb = openpyxl.load_workbook(ECONOMY_FILE, data_only=True) + ws = wb["1.7"] + # CPI growth is in a fiscal year row format too + cpi_idx: dict[int, float] = {2022: 1.0} + for row in range(4, 200): + b = ws.cell(row=row, column=2).value + if b is None: + continue + year = _parse_fiscal_year(str(b)) + if year is not None and 2023 <= year <= 2030: + rate = ws.cell(row=row, column=4).value # Col D = CPI + if rate is not None and isinstance(rate, (int, float)): + cpi_idx[year] = cpi_idx.get(year - 1, 1.0) * (1 + rate / 100.0) + wb.close() + indexes["state_pension"] = cpi_idx + indexes["private_pension_income"] = cpi_idx + + return indexes + + def _find_row(ws, label: str, col: str = "B", max_row: int = 70) -> int | None: for row in range(1, max_row + 1): val = ws[f"{col}{row}"].value @@ -316,6 +395,145 @@ def _parse_council_tax() -> list[dict]: return targets +def _parse_fiscal_year(label: str) -> int | None: + """Parse '2025-26' → 2025, or '2025/26' → 2025.""" + s = str(label).strip() + for sep in ["-", "/"]: + if sep in s: + parts = s.split(sep) + try: + return int(parts[0]) + except ValueError: + return None + return None + + +def _read_fiscal_year_rows( + ws, col_map: dict[str, str], max_row: int = 200 +) -> list[tuple[int, dict[str, float]]]: + """Scan column B for fiscal year labels (e.g. '2025-26') and read values. + + col_map maps a descriptive key to a column letter, e.g. {"employment": "C"}. + Returns [(year, {key: value}), ...]. + """ + results = [] + for row in range(4, max_row): + b = ws[f"B{row}"].value + if b is None: + continue + year = _parse_fiscal_year(b) + if year is None or year < 2020: + continue + vals = {} + for key, col in col_map.items(): + v = ws[f"{col}{row}"].value + if v is not None and isinstance(v, (int, float)): + vals[key] = float(v) + if vals: + results.append((year, vals)) + return results + + +def _parse_economy() -> list[dict]: + """Parse economy tables for labour market and income aggregates.""" + wb = openpyxl.load_workbook(ECONOMY_FILE, data_only=True) + targets = [] + + # ── 1.6 Labour market (fiscal year rows) ── + ws = wb["1.6"] + for year, vals in _read_fiscal_year_rows( + ws, + { + "employment": "C", # Employment 16+, millions + "employees": "E", # Employees 16+, millions + "unemployment": "F", # ILO unemployment, millions + "total_hours": "J", # Total hours worked, millions per week + "comp_employees": "M", # Compensation of employees, £bn + "wages_salaries": "N", # Wages and salaries, £bn + "employer_social": "O", # Employer social contributions, £bn + "mixed_income": "P", # Mixed income (self-employment), £bn + }, + ): + # Employment count: people with employment_income > 0 + if "employment" in vals: + targets.append( + { + "name": f"obr/employment_count/{year}", + "variable": "employment_income", + "entity": "person", + "aggregation": "count_nonzero", + "filter": None, + "value": vals["employment"] * 1e6, + "source": "obr", + "year": year, + "holdout": False, + } + ) + + # Total wages and salaries: sum of employment_income + if "wages_salaries" in vals: + targets.append( + { + "name": f"obr/wages_salaries/{year}", + "variable": "employment_income", + "entity": "person", + "aggregation": "sum", + "filter": None, + "value": vals["wages_salaries"] * 1e9, + "source": "obr", + "year": year, + "holdout": False, + } + ) + + # Employer social contributions — skipped: OBR figure includes pensions + # and other employer costs beyond NI. employer_ni already covered by + # NI receipts target. + + # Mixed income ≈ total self-employment income + if "mixed_income" in vals: + targets.append( + { + "name": f"obr/self_employment_income/{year}", + "variable": "self_employment_income", + "entity": "person", + "aggregation": "sum", + "filter": None, + "value": vals["mixed_income"] * 1e9, + "source": "obr", + "year": year, + "holdout": False, + } + ) + + # Self-employment count + if "mixed_income" in vals: + targets.append( + { + "name": f"obr/self_employed_count/{year}", + "variable": "self_employment_income", + "entity": "person", + "aggregation": "count_nonzero", + "filter": None, + "value": (vals["employment"] - vals.get("employees", 0)) * 1e6 + if "employment" in vals and "employees" in vals + else 0, + "source": "obr", + "year": year, + "holdout": True, + } + ) + + # Total hours worked — skipped: hours_worked not populated in EFRS. + + # RHDI (1.12) excluded — OBR national accounts definition differs from + # HBAI net income (includes imputed rent, NPISH, etc.). + # Housing stock (1.16) excluded — overlaps with ONS total_households. + + wb.close() + return targets + + def get_targets() -> list[dict]: targets = [] if RECEIPTS_FILE.exists(): @@ -324,4 +542,6 @@ def get_targets() -> list[dict]: if EXPENDITURE_FILE.exists(): targets.extend(_parse_welfare()) targets.extend(_parse_council_tax()) + if ECONOMY_FILE.exists(): + targets.extend(_parse_economy()) return targets diff --git a/src/data/calibrate.rs b/src/data/calibrate.rs index 8de515b..90ae3db 100644 --- a/src/data/calibrate.rs +++ b/src/data/calibrate.rs @@ -296,7 +296,8 @@ pub fn build_matrix( for (j, target) in targets.iter().enumerate() { target_values[j] = target.value; - training_mask[j] = !target.holdout; + // All targets participate in training. The holdout flag is only + // used for separate error reporting, not gradient exclusion. match target.entity.as_str() { "person" => {