Skip to content
Merged
212 changes: 200 additions & 12 deletions scripts/build_targets/dwp.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""Fetch DWP benefit statistics from the Stat-Xplore API.
"""Fetch DWP benefit statistics from the Stat-Xplore API and forecasts.

Queries caseloads for UC (with subgroup breakdowns), PIP, pension credit,
carer's allowance, attendance allowance, state pension, ESA, and DLA.
Results are cached locally to avoid repeated API calls.

The Stat-Xplore snapshot (latest month) is then scaled to all calibration
years (2024-2029) using DWP's own caseload forecasts from the Spring
Statement 2025 benefit expenditure and caseload tables.

Requires STAT_XPLORE_API_KEY environment variable to be set.
See: https://stat-xplore.dwp.gov.uk/webapi/online-help/Open-Data-API.html
"""
Expand All @@ -15,6 +19,7 @@
import os
from pathlib import Path

import openpyxl
import requests

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -373,23 +378,206 @@ def _fetch_uc_breakdowns() -> list[dict]:
return targets


DWP_FORECAST_URL = (
"https://assets.publishing.service.gov.uk/media/68f8923724fc2bb7eed11ac8/"
"outturn-and-forecast-tables-spring-statement-2025.xlsx"
)
DWP_FORECAST_FILE = CACHE_DIR / "dwp_spring_statement_2025.xlsx"

CALIBRATION_YEARS = range(2024, 2030) # 2024/25 through 2029/30

# Column 80 = 2024/25, ..., 85 = 2029/30 in the DWP forecast xlsx
_FORECAST_COL_TO_YEAR = {80: 2024, 81: 2025, 82: 2026, 83: 2027, 84: 2028, 85: 2029}


def _download_forecast() -> Path:
"""Download the DWP forecast xlsx if not cached."""
if DWP_FORECAST_FILE.exists():
return DWP_FORECAST_FILE
logger.info("Downloading DWP forecast tables...")
r = requests.get(DWP_FORECAST_URL, timeout=60, allow_redirects=True)
r.raise_for_status()
CACHE_DIR.mkdir(parents=True, exist_ok=True)
DWP_FORECAST_FILE.write_bytes(r.content)
return DWP_FORECAST_FILE


def _find_forecast_row(ws, label: str, start_row: int = 1, max_row: int = 200) -> int | None:
"""Find the first row in column B starting with label."""
for row in range(start_row, max_row + 1):
val = ws.cell(row=row, column=2).value
if val and str(val).strip().startswith(label):
return row
return None


def _read_forecast_row(ws, row: int) -> dict[int, float]:
"""Read caseload values (thousands) from a forecast row."""
result = {}
for col, year in _FORECAST_COL_TO_YEAR.items():
val = ws.cell(row=row, column=col).value
if val is not None and isinstance(val, (int, float)):
result[year] = float(val) * 1e3 # thousands → people
return result


def _parse_caseload_forecasts() -> dict[str, dict[int, float]]:
"""Parse DWP forecast xlsx for benefit caseload projections.

Returns {benefit_key: {year: caseload}} for each benefit.
"""
try:
path = _download_forecast()
except Exception as e:
logger.warning("Failed to download DWP forecast: %s", e)
return {}

wb = openpyxl.load_workbook(path, data_only=True)
forecasts: dict[str, dict[int, float]] = {}

# UC caseloads from "Universal Credit and equivalent" sheet
ws = wb["Universal Credit and equivalent"]
uc_row = _find_forecast_row(ws, "Universal Credit", start_row=48)
if uc_row:
forecasts["universal_credit"] = _read_forecast_row(ws, uc_row)

uc_carer_row = _find_forecast_row(ws, "Universal Credit Carers Element", start_row=48)
if uc_carer_row:
forecasts["uc_carer_element"] = _read_forecast_row(ws, uc_carer_row)

uc_housing_row = _find_forecast_row(ws, "Universal Credit Housing Element", start_row=48)
if uc_housing_row:
forecasts["uc_housing_element"] = _read_forecast_row(ws, uc_housing_row)

# LCWRA from health element breakdown
lcwra_row = _find_forecast_row(ws, "of which limited capability for work and work-related activi", start_row=48)
if lcwra_row:
forecasts["uc_lcwra"] = _read_forecast_row(ws, lcwra_row)

lcw_row = _find_forecast_row(ws, "of which limited capability for work", start_row=48)
if lcw_row:
# Make sure we didn't pick up the LCWRA row
label = str(ws.cell(row=lcw_row, column=2).value).strip()
if "related" not in label:
forecasts["uc_lcw"] = _read_forecast_row(ws, lcw_row)

esa_row = _find_forecast_row(ws, "Employment and Support Allowance", start_row=48)
if esa_row:
forecasts["esa"] = _read_forecast_row(ws, esa_row)

# Disability benefits sheet
ws = wb["Disability benefits"]
pip_row = _find_forecast_row(ws, "Personal Independence Payment", start_row=50)
if pip_row:
forecasts["pip"] = _read_forecast_row(ws, pip_row)

dla_row = _find_forecast_row(ws, "Disability Living Allowance", start_row=50)
if dla_row:
forecasts["dla"] = _read_forecast_row(ws, dla_row)

aa_row = _find_forecast_row(ws, "Attendance Allowance", start_row=50)
if aa_row:
forecasts["attendance_allowance"] = _read_forecast_row(ws, aa_row)

# Carer's Allowance sheet
ws = wb["Carers Allowance"]
ca_total_row = _find_forecast_row(ws, "Total", start_row=14)
if ca_total_row:
forecasts["carers_allowance"] = _read_forecast_row(ws, ca_total_row)

# Pension Credit sheet
ws = wb["Pension Credit"]
pc_row = _find_forecast_row(ws, "Total Pension Credit", start_row=18)
if pc_row:
forecasts["pension_credit"] = _read_forecast_row(ws, pc_row)

# State Pension sheet
ws = wb["State Pension"]
sp_row = _find_forecast_row(ws, "Total State Pension Caseload", start_row=28)
if sp_row:
forecasts["state_pension"] = _read_forecast_row(ws, sp_row)

wb.close()
return forecasts


def _scale_targets_to_years(
base_targets: list[dict],
forecasts: dict[str, dict[int, float]],
) -> list[dict]:
"""Scale stat-xplore snapshot targets to all calibration years using DWP forecasts.

For each base target (from stat-xplore, typically 2025), compute a scaling
factor from the DWP forecast caseload trajectory and emit a target for each year.
"""
# Map target names to forecast keys for scaling
_FORECAST_KEY = {
"dwp/uc_total_claimants": "universal_credit",
"dwp/pip_total_claimants": "pip",
"dwp/pension_credit_claimants": "pension_credit",
"dwp/carers_allowance_claimants": "carers_allowance",
"dwp/attendance_allowance_claimants": "attendance_allowance",
"dwp/state_pension_claimants": "state_pension",
"dwp/esa_claimants": "esa",
"dwp/dla_claimants": "dla",
"dwp/uc_households_with_children": "universal_credit",
"dwp/uc_households_lcwra": "uc_lcwra",
"dwp/uc_households_lcw": "uc_lcw",
"dwp/uc_households_with_carer": "uc_carer_element",
"dwp/uc_households_with_housing": "uc_housing_element",
# Family type breakdowns scale with total UC
"dwp/uc_households_single_no_children": "universal_credit",
"dwp/uc_households_single_with_children": "universal_credit",
"dwp/uc_households_couple_no_children": "universal_credit",
"dwp/uc_households_couple_with_children": "universal_credit",
}

scaled: list[dict] = []
for target in base_targets:
base_year = target["year"]
forecast_key = _FORECAST_KEY.get(target["name"])
forecast_series = forecasts.get(forecast_key, {}) if forecast_key else {}
base_forecast = forecast_series.get(base_year, 0)

for year in CALIBRATION_YEARS:
year_forecast = forecast_series.get(year, 0)
if base_forecast > 0 and year_forecast > 0:
scale = year_forecast / base_forecast
else:
scale = 1.0

t = dict(target)
t["name"] = f"{target['name']}/{year}"
t["year"] = year
t["value"] = target["value"] * scale
scaled.append(t)

return scaled


def get_targets() -> list[dict]:
if CACHE_FILE.exists():
logger.info("Using cached DWP targets: %s", CACHE_FILE)
return json.loads(CACHE_FILE.read_text())

if not API_KEY:
base_targets = json.loads(CACHE_FILE.read_text())
elif API_KEY:
base_targets = []
base_targets.extend(_fetch_simple_benefits())
base_targets.extend(_fetch_uc_breakdowns())
CACHE_DIR.mkdir(parents=True, exist_ok=True)
CACHE_FILE.write_text(json.dumps(base_targets, indent=2))
logger.info("Cached %d DWP base targets to %s", len(base_targets), CACHE_FILE)
else:
logger.warning(
"STAT_XPLORE_API_KEY not set — skipping DWP targets. "
"STAT_XPLORE_API_KEY not set and no cache — skipping DWP targets. "
"Set the env var and re-run to fetch from stat-xplore."
)
return []

targets = []
targets.extend(_fetch_simple_benefits())
targets.extend(_fetch_uc_breakdowns())
# Parse DWP caseload forecasts and scale base targets to all years
forecasts = _parse_caseload_forecasts()
if forecasts:
return _scale_targets_to_years(base_targets, forecasts)

CACHE_DIR.mkdir(parents=True, exist_ok=True)
CACHE_FILE.write_text(json.dumps(targets, indent=2))
logger.info("Cached %d DWP targets to %s", len(targets), CACHE_FILE)
return targets
# Fallback: emit base targets as-is (single year only)
logger.warning("No DWP forecasts available — emitting base targets for single year only")
return base_targets
101 changes: 63 additions & 38 deletions scripts/build_targets/hmrc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
income-by-band targets for employment, self-employment, pensions, property,
dividends, and savings interest — both amounts and taxpayer counts per band.

The 2022-23 SPI snapshot is then scaled to all calibration years (2024-2030)
using OBR income growth indexes from sheets 3.5 and 1.6.

Source: https://www.gov.uk/government/statistics/income-tax-summarised-accounts-statistics
"""

Expand All @@ -24,7 +27,8 @@

# HMRC SPI 2022-23 collated tables (ODS)
SPI_URL = "https://assets.publishing.service.gov.uk/media/67cabb37ade26736dbf9ffe5/Collated_Tables_3_1_to_3_17_2223.ods"
SPI_YEAR = 2023 # FY 2022-23 → calendar 2023
SPI_YEAR = 2022 # FY 2022-23 → base year for growth indexing
CALIBRATION_YEARS = range(2024, 2031)

INCOME_BANDS_LOWER = [
12_570,
Expand Down Expand Up @@ -139,11 +143,16 @@ def get_targets() -> list[dict]:
logger.error("Failed to download HMRC SPI ODS: %s", e)
return targets

# Get OBR growth indexes for scaling to future years
from build_targets import obr

growth_indexes = obr.get_income_growth_indexes()

t36 = _parse_table_36(ods_bytes)
t37 = _parse_table_37(ods_bytes)
merged = t36.merge(t37, on="lower_bound", how="outer")

# Hold out count targets as validation (amounts used for training)
# Build base-year targets, then scale to all calibration years
for idx, row in merged.iterrows():
lower = int(row["lower_bound"])
upper = INCOME_BANDS_UPPER[idx] if idx < len(INCOME_BANDS_UPPER) else 1e12
Expand All @@ -154,43 +163,59 @@ def get_targets() -> list[dict]:
count_col = f"{variable}_count"

if amount_col in row.index and row[amount_col] > 0:
# SPI amounts are in £millions
targets.append(
{
"name": f"hmrc/{variable}_amount_{band_label}",
"variable": variable,
"entity": "person",
"aggregation": "sum",
"filter": {
"variable": "total_income",
"min": float(lower),
"max": float(upper),
},
"value": float(row[amount_col]) * 1e6,
"source": "hmrc_spi",
"year": SPI_YEAR,
"holdout": False,
}
)
base_amount = float(row[amount_col]) * 1e6 # £millions → £
var_index = growth_indexes.get(variable, {})

for year in CALIBRATION_YEARS:
# Scale amount by growth index relative to base year
scale = 1.0
if var_index:
base_idx = var_index.get(SPI_YEAR, 1.0)
year_idx = var_index.get(year, base_idx)
scale = year_idx / base_idx if base_idx > 0 else 1.0
scaled_amount = base_amount * scale

targets.append(
{
"name": f"hmrc/{variable}_amount_{band_label}/{year}",
"variable": variable,
"entity": "person",
"aggregation": "sum",
"filter": {
"variable": "total_income",
"min": float(lower),
"max": float(upper),
},
"value": scaled_amount,
"source": "hmrc_spi",
"year": year,
"holdout": False,
}
)

if count_col in row.index and row[count_col] > 0:
# SPI counts are in thousands — use as holdout validation
targets.append(
{
"name": f"hmrc/{variable}_count_{band_label}",
"variable": variable,
"entity": "person",
"aggregation": "count_nonzero",
"filter": {
"variable": "total_income",
"min": float(lower),
"max": float(upper),
},
"value": float(row[count_col]) * 1e3,
"source": "hmrc_spi",
"year": SPI_YEAR,
"holdout": True,
}
)
base_count = float(row[count_col]) * 1e3 # thousands → people

for year in CALIBRATION_YEARS:
# Counts are held constant — income growth changes amounts
# not the number of taxpayers per band (the band boundaries
# are fixed in nominal terms)
targets.append(
{
"name": f"hmrc/{variable}_count_{band_label}/{year}",
"variable": variable,
"entity": "person",
"aggregation": "count_nonzero",
"filter": {
"variable": "total_income",
"min": float(lower),
"max": float(upper),
},
"value": base_count,
"source": "hmrc_spi",
"year": year,
"holdout": True,
}
)

return targets
Loading
Loading