From d9ee5355ac45c0b074140343933f1db22c0fe07e Mon Sep 17 00:00:00 2001 From: Griffin Sharps Date: Mon, 22 Dec 2025 21:12:15 +0000 Subject: [PATCH 1/5] Including helper program to set up PR --- pyproject.toml | 1 + smart_meter_analysis/run_manifest.py | 152 +++++++++++++++++++++++++++ 2 files changed, 153 insertions(+) create mode 100644 smart_meter_analysis/run_manifest.py diff --git a/pyproject.toml b/pyproject.toml index 902af39..59428ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -128,6 +128,7 @@ extend-ignore = ["TRY003", "TRY300", "TRY400"] "scripts/run_comed_pipeline.py" = ["C901", "S603"] "scripts/diagnostics/*.py" = ["C901"] "smart_meter_analysis/pipeline_validator.py" = ["C901", "PGH003"] +"smart_meter_analysis/run_manifest.py" = ["S603", "S607"] "tests/test_aws_transform.py" = ["E402"] "tests/test_census.py" = ["E402"] "scripts/data_collection/*" = ["C901"] diff --git a/smart_meter_analysis/run_manifest.py b/smart_meter_analysis/run_manifest.py new file mode 100644 index 0000000..a1536c4 --- /dev/null +++ b/smart_meter_analysis/run_manifest.py @@ -0,0 +1,152 @@ +# smart_meter_analysis/run_manifest.py +from __future__ import annotations + +import json +import platform +import sys +from collections.abc import Iterable +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path + + +@dataclass(frozen=True) +class Stage2RunManifest: + """Lightweight, reproducible metadata for a Stage 2 run.""" + + created_utc: str + python: str + platform: str + git_commit: str | None + command: str + output_dir: str + + # Inputs + clusters_path: str + crosswalk_path: str + census_cache_path: str + + # Key parameters + baseline_cluster: str | int | None + min_obs_per_bg: int + alpha: float + weight_column: str | None + + # Predictor handling + predictors_total_detected: int | None + predictors_used: list[str] + predictors_excluded_all_null: list[str] + + # Dataset sizes + block_groups_total: int | None + block_groups_after_min_obs: int | None + block_groups_after_drop_null_predictors: int | None + + # Outputs + regression_data_path: str | None + regression_report_path: str | None + run_log_path: str | None + + +def _safe_git_commit(repo_root: Path) -> str | None: + """Best-effort git commit retrieval without depending on GitPython.""" + try: + import subprocess + + r = subprocess.run( + ["git", "rev-parse", "HEAD"], + cwd=str(repo_root), + check=False, + capture_output=True, + text=True, + ) + if r.returncode == 0: + return r.stdout.strip() or None + return None + except Exception: + return None + + +def write_stage2_manifest( + *, + output_dir: str | Path, + command: str, + repo_root: str | Path | None = None, + clusters_path: str | Path, + crosswalk_path: str | Path, + census_cache_path: str | Path, + baseline_cluster: str | int | None, + min_obs_per_bg: int, + alpha: float, + weight_column: str | None, + predictors_detected: int | None, + predictors_used: Iterable[str], + predictors_excluded_all_null: Iterable[str], + block_groups_total: int | None, + block_groups_after_min_obs: int | None, + block_groups_after_drop_null_predictors: int | None, + regression_data_path: str | Path | None, + regression_report_path: str | Path | None, + run_log_path: str | Path | None, +) -> Path: + """ + Writes: + - stage2_manifest.json : run metadata + - predictors_used.txt : final predictor list (stable across runs) + - predictors_excluded_all_null.txt : excluded predictors with 100% nulls + """ + out = Path(output_dir) + out.mkdir(parents=True, exist_ok=True) + + predictors_used_list = sorted(dict.fromkeys(list(predictors_used))) + excluded_list = sorted(dict.fromkeys(list(predictors_excluded_all_null))) + + # Persist predictor lists (the key stability artifact) + (out / "predictors_used.txt").write_text("\n".join(predictors_used_list) + "\n", encoding="utf-8") + (out / "predictors_excluded_all_null.txt").write_text("\n".join(excluded_list) + "\n", encoding="utf-8") + + # Build manifest + created_utc = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + repo_root_path = Path(repo_root) if repo_root is not None else None + git_commit = _safe_git_commit(repo_root_path) if repo_root_path else None + + manifest = Stage2RunManifest( + created_utc=created_utc, + python=sys.version.replace("\n", " "), + platform=f"{platform.system()} {platform.release()} ({platform.machine()})", + git_commit=git_commit, + command=command, + output_dir=str(out), + clusters_path=str(clusters_path), + crosswalk_path=str(crosswalk_path), + census_cache_path=str(census_cache_path), + baseline_cluster=baseline_cluster, + min_obs_per_bg=min_obs_per_bg, + alpha=alpha, + weight_column=weight_column, + predictors_total_detected=predictors_detected, + predictors_used=predictors_used_list, + predictors_excluded_all_null=excluded_list, + block_groups_total=block_groups_total, + block_groups_after_min_obs=block_groups_after_min_obs, + block_groups_after_drop_null_predictors=block_groups_after_drop_null_predictors, + regression_data_path=str(regression_data_path) if regression_data_path else None, + regression_report_path=str(regression_report_path) if regression_report_path else None, + run_log_path=str(run_log_path) if run_log_path else None, + ) + + manifest_path = out / "stage2_manifest.json" + manifest_path.write_text(json.dumps(manifest.__dict__, indent=2, sort_keys=True) + "\n", encoding="utf-8") + return manifest_path + + +def load_persisted_predictors(output_dir: str | Path) -> list[str] | None: + """ + If `predictors_used.txt` exists, return that list to reuse exactly on a new run. + This is the "stable across runs" option. + """ + p = Path(output_dir) / "predictors_used.txt" + if not p.exists(): + return None + lines = [ln.strip() for ln in p.read_text(encoding="utf-8").splitlines()] + return [ln for ln in lines if ln] From 4b88e186b1f39c48ba19947420993ad1727cc5b7 Mon Sep 17 00:00:00 2001 From: Griffin Sharps Date: Sat, 27 Dec 2025 00:07:20 +0000 Subject: [PATCH 2/5] Revision of all key pipeline programs in order to integrate expanded variable list and Laplace smoothing. General code cleanup in the aftermath of CUB client meeting. --- analysis/clustering/clustering_validation.py | 428 ----- analysis/clustering/dtw_clustering.py | 661 -------- analysis/clustering/euclidean_clustering.py | 823 --------- .../clustering/stage2_logratio_regression.py | 1496 ++++++++--------- docs/index.html | 91 +- pyproject.toml | 12 +- scripts/process_csvs_batched_optimized.py | 338 +++- scripts/run_comed_pipeline.py | 1405 +++++----------- scripts/run_pipeline.py | 275 --- smart_meter_analysis/aws_loader.py | 464 +++-- smart_meter_analysis/census.py | 70 +- smart_meter_analysis/census_specs.py | 453 ++++- smart_meter_analysis/run_manifest.py | 64 +- smart_meter_analysis/transformation.py | 183 +- tests/validate_total_comed_pipeline.py | 986 ----------- 15 files changed, 2330 insertions(+), 5419 deletions(-) delete mode 100644 analysis/clustering/clustering_validation.py delete mode 100644 analysis/clustering/dtw_clustering.py delete mode 100644 analysis/clustering/euclidean_clustering.py delete mode 100755 scripts/run_pipeline.py delete mode 100644 tests/validate_total_comed_pipeline.py diff --git a/analysis/clustering/clustering_validation.py b/analysis/clustering/clustering_validation.py deleted file mode 100644 index c45cbd2..0000000 --- a/analysis/clustering/clustering_validation.py +++ /dev/null @@ -1,428 +0,0 @@ -""" -Validation utilities for clustering pipeline. - -Validates data quality at each stage of the DTW clustering analysis. Designed -to work with both enriched data (with census variables) and raw processed data -(without census variables) to support flexible pipeline configurations. -""" - -from __future__ import annotations - -import logging -from typing import Any - -import polars as pl - -logger = logging.getLogger(__name__) - - -class ClusteringDataValidator: - """Validates data at each stage of clustering pipeline.""" - - def __init__(self): - self.errors: list[str] = [] - self.warnings: list[str] = [] - - def reset(self): - """Reset errors and warnings for a new validation run.""" - self.errors = [] - self.warnings = [] - - def validate_enriched_data(self, df: pl.DataFrame) -> dict[str, Any]: - """ - Validate interval-level data before aggregation. - - Works with both enriched data (with census variables) and raw processed - data (without census variables). Geographic validation is skipped if - census columns are not present. - - Args: - df: Interval-level energy data - - Returns: - Validation results with status, errors, warnings, and statistics - """ - self.reset() - logger.info("Validating interval data...") - - # Required energy columns (must be present) - required_energy_cols = ["zip_code", "account_identifier", "datetime", "kwh"] - - # Required time columns (must be present) - required_time_cols = ["date", "hour", "weekday", "is_weekend"] - - # Geographic columns (optional - only validated if present) - geo_cols = ["block_group_geoid"] - - missing_energy = [c for c in required_energy_cols if c not in df.columns] - missing_time = [c for c in required_time_cols if c not in df.columns] - - if missing_energy: - self.errors.append(f"Missing energy columns: {missing_energy}") - if missing_time: - self.errors.append(f"Missing time columns: {missing_time}") - - # Check data completeness for critical columns - critical_cols = ["zip_code", "account_identifier", "datetime", "kwh"] - for col in critical_cols: - if col not in df.columns: - continue - null_count = df[col].null_count() - null_pct = (null_count / len(df)) * 100 - if null_pct > 5: - self.errors.append(f"{col}: {null_pct:.1f}% null values (>5%)") - elif null_pct > 0: - self.warnings.append(f"{col}: {null_pct:.1f}% null values") - - # Geographic coverage - only check if column exists - match_rate = None - if "block_group_geoid" in df.columns: - total_rows = len(df) - matched_rows = df.filter(pl.col("block_group_geoid").is_not_null()).height - match_rate = (matched_rows / total_rows) * 100 - - if match_rate < 90: - self.errors.append(f"Low geographic match rate: {match_rate:.1f}%") - elif match_rate < 95: - self.warnings.append(f"Geographic match rate below 95%: {match_rate:.1f}%") - else: - self.warnings.append("No geographic columns - running without census enrichment") - - # Check time features - if "hour" in df.columns: - hour_min, hour_max = df["hour"].min(), df["hour"].max() - if hour_min < 0 or hour_max > 23: - self.errors.append(f"Invalid hour values: {hour_min} to {hour_max}") - - if "weekday" in df.columns: - weekday_min, weekday_max = df["weekday"].min(), df["weekday"].max() - if weekday_min < 1 or weekday_max > 7: - self.errors.append(f"Invalid weekday values: {weekday_min} to {weekday_max}") - - # Check energy values - if "kwh" in df.columns: - kwh_stats = df.select([ - pl.col("kwh").min().alias("min"), - pl.col("kwh").max().alias("max"), - pl.col("kwh").mean().alias("mean"), - ]).to_dicts()[0] - - if kwh_stats["min"] is not None and kwh_stats["min"] < 0: - self.warnings.append(f"Negative kWh values: min={kwh_stats['min']:.4f}") - - if kwh_stats["max"] is not None and kwh_stats["max"] > 100: - self.warnings.append(f"Very high kWh values: max={kwh_stats['max']:.2f}") - - self._print_results("INTERVAL DATA VALIDATION") - - return { - "status": "PASS" if not self.errors else "FAIL", - "errors": self.errors, - "warnings": self.warnings, - "stats": { - "n_rows": len(df), - "n_accounts": df["account_identifier"].n_unique() if "account_identifier" in df.columns else None, - "n_zip4s": df["zip_code"].n_unique() if "zip_code" in df.columns else None, - "geographic_match_rate": match_rate, - "has_census_data": "block_group_geoid" in df.columns, - }, - } - - def validate_daily_profiles(self, df: pl.DataFrame, expected_intervals: int = 48) -> dict[str, Any]: - """ - Validate daily load profiles after aggregation. - - Ensures profiles have the expected 48-point structure and reasonable values. - - Args: - df: Daily profiles with 'profile' list column - expected_intervals: Expected intervals per profile (default: 48) - - Returns: - Validation results with status, errors, warnings, and statistics - """ - self.reset() - logger.info("Validating daily profiles...") - - # Check required columns - required_cols = ["zip_code", "date", "profile"] - missing = [c for c in required_cols if c not in df.columns] - if missing: - self.errors.append(f"Missing required columns: {missing}") - self._print_results("PROFILE VALIDATION") - return {"status": "FAIL", "errors": self.errors, "warnings": self.warnings} - - # Check profile completeness - if "num_intervals" in df.columns: - incomplete = df.filter( - (pl.col("num_intervals") < expected_intervals - 1) | (pl.col("num_intervals") > expected_intervals) - ) - if incomplete.height > 0: - pct = (incomplete.height / len(df)) * 100 - self.warnings.append(f"{incomplete.height} profiles ({pct:.1f}%) have incorrect interval count") - - # Check profile array lengths - profile_lengths = df.select(pl.col("profile").list.len().alias("len")).unique() - unique_lengths = profile_lengths["len"].to_list() - - if len(unique_lengths) > 1: - self.warnings.append(f"Inconsistent profile lengths: {unique_lengths}") - elif unique_lengths[0] != expected_intervals: - self.errors.append(f"Profile length {unique_lengths[0]} != expected {expected_intervals}") - - # Check for null profiles - null_profiles = df.filter(pl.col("profile").is_null()).height - if null_profiles > 0: - self.errors.append(f"{null_profiles} null profiles found") - - # Check date coverage per ZIP+4 - dates_per_zip = df.group_by("zip_code").agg([ - pl.col("date").n_unique().alias("n_dates"), - pl.col("date").min().alias("min_date"), - pl.col("date").max().alias("max_date"), - ]) - - # Flag ZIP+4s with very few days - sparse_zips = dates_per_zip.filter(pl.col("n_dates") < 5) - if sparse_zips.height > 0: - self.warnings.append(f"{sparse_zips.height} ZIP+4s have fewer than 5 days of data") - - # Check for reasonable values in profiles - if df.height > 0: - value_stats = ( - df.select(pl.col("profile").list.explode().alias("value")) - .select([ - pl.col("value").min().alias("min"), - pl.col("value").max().alias("max"), - ]) - .to_dicts()[0] - ) - - if value_stats["min"] is not None and value_stats["min"] < 0: - self.warnings.append(f"Negative values in profiles: min={value_stats['min']:.2f}") - - if value_stats["max"] is not None and value_stats["max"] > 10000: - self.warnings.append(f"Very high values in profiles: max={value_stats['max']:.2f}") - - self._print_results("DAILY PROFILES VALIDATION") - - return { - "status": "PASS" if not self.errors else "FAIL", - "errors": self.errors, - "warnings": self.warnings, - "stats": { - "n_profiles": len(df), - "n_zip4s": df["zip_code"].n_unique(), - "n_dates": df["date"].n_unique(), - "profile_length": unique_lengths[0] if unique_lengths else None, - }, - } - - def validate_demographics(self, df: pl.DataFrame, required_zip4s: set[str] | None = None) -> dict[str, Any]: - """ - Validate census demographics data. - - Args: - df: Demographics data with ZIP+4 codes - required_zip4s: Set of ZIP+4 codes that must have demographics - - Returns: - Validation results with status, errors, warnings, and statistics - """ - self.reset() - logger.info("Validating demographics data...") - - # Handle empty demographics (valid when running without census data) - if df is None or df.height == 0: - self.warnings.append("No demographics data - running without census enrichment") - self._print_results("DEMOGRAPHICS VALIDATION") - return { - "status": "PASS", - "errors": [], - "warnings": self.warnings, - "stats": {"n_zip4s": 0, "n_demo_vars": 0}, - } - - # Check required columns - required_cols = ["zip_code"] - missing = [c for c in required_cols if c not in df.columns] - if missing: - self.errors.append(f"Missing required columns: {missing}") - - # Check GEOID format if present - if "block_group_geoid" in df.columns: - non_null_geoids = df.filter(pl.col("block_group_geoid").is_not_null()) - - if non_null_geoids.height > 0: - geoid_lengths = non_null_geoids["block_group_geoid"].str.len_chars().unique().to_list() - if geoid_lengths and 12 not in geoid_lengths: - self.errors.append(f"Block Group GEOIDs should be 12 digits, got: {geoid_lengths}") - - # Check Illinois state FIPS - non_il = non_null_geoids.filter(~pl.col("block_group_geoid").str.starts_with("17")).height - if non_il > 0: - self.warnings.append(f"{non_il} GEOIDs don't start with '17' (Illinois)") - - # Check coverage of required ZIP+4s - if required_zip4s: - present_zips = set(df["zip_code"].to_list()) - missing_zips = required_zip4s - present_zips - if missing_zips: - self.warnings.append(f"{len(missing_zips)} required ZIP+4s missing demographics") - - # Count demographic columns - demo_cols = [c for c in df.columns if c not in ["zip_code", "block_group_geoid", "Urban_Rural_Classification"]] - - # Check for excessive nulls in demographic columns - high_null_cols = [] - for col in demo_cols: - null_pct = (df[col].null_count() / len(df)) * 100 - if null_pct > 50: - high_null_cols.append(f"{col} ({null_pct:.1f}%)") - - if high_null_cols: - self.warnings.append(f"{len(high_null_cols)} columns with >50% nulls") - - self._print_results("DEMOGRAPHICS VALIDATION") - - return { - "status": "PASS" if not self.errors else "FAIL", - "errors": self.errors, - "warnings": self.warnings, - "stats": { - "n_zip4s": len(df), - "n_demo_vars": len(demo_cols), - "high_null_vars": len(high_null_cols), - }, - } - - def _print_results(self, title: str): - """Print validation results summary.""" - print(f"\n{'=' * 80}") - print(f"{title}") - print("=" * 80) - - if self.errors: - print(f"\n❌ FAILED with {len(self.errors)} error(s):") - for err in self.errors: - print(f" - {err}") - else: - print("\n✅ PASSED all critical checks") - - if self.warnings: - print(f"\n⚠️ {len(self.warnings)} warning(s):") - for warn in self.warnings: - print(f" - {warn}") - - -def validate_interval_completeness( - df: pl.DataFrame, account_col: str = "account_identifier", date_col: str = "date", expected_intervals: int = 48 -) -> pl.DataFrame: - """ - Check interval completeness for each account-date combination. - - Args: - df: Interval-level data - account_col: Account identifier column name - date_col: Date column name - expected_intervals: Expected intervals per day (default: 48) - - Returns: - DataFrame with completeness statistics per account-date - """ - completeness = ( - df.group_by([account_col, date_col]) - .agg([ - pl.len().alias("n_intervals"), - pl.col("kwh").is_null().sum().alias("n_null_kwh"), - ]) - .with_columns([ - (pl.col("n_intervals") == expected_intervals).alias("is_complete"), - ((pl.col("n_intervals") - pl.col("n_null_kwh")) / expected_intervals * 100).alias("completeness_pct"), - ]) - ) - - return completeness - - -def check_for_duplicates(df: pl.DataFrame, key_cols: list[str]) -> tuple[int, pl.DataFrame | None]: - """ - Check for duplicate records based on key columns. - - Args: - df: DataFrame to check - key_cols: Columns that should be unique together - - Returns: - Tuple of (duplicate_count, duplicate_records_df) - """ - duplicates = df.group_by(key_cols).agg(pl.len().alias("count")).filter(pl.col("count") > 1) - - n_dups = duplicates.height - - if n_dups > 0: - dup_keys = duplicates.select(key_cols) - dup_records = df.join(dup_keys, on=key_cols, how="inner") - return n_dups, dup_records - - return 0, None - - -def validate_time_series_array( - profiles: list[list[float]], expected_length: int = 48, max_profiles: int = 5000 -) -> dict[str, Any]: - """ - Validate time series arrays for clustering. - - Args: - profiles: List of time series profiles - expected_length: Expected length of each profile - max_profiles: Maximum profiles to validate (samples if exceeded) - - Returns: - Validation results dictionary - """ - import numpy as np - - issues = [] - warnings = [] - - if len(profiles) > max_profiles: - logger.info(f"Sampling {max_profiles} of {len(profiles)} profiles for validation") - profiles = profiles[:max_profiles] - - # Check lengths - lengths = [len(p) for p in profiles] - unique_lengths = set(lengths) - - if len(unique_lengths) > 1: - issues.append(f"Inconsistent lengths: {unique_lengths}") - elif list(unique_lengths)[0] != expected_length: - issues.append(f"Expected length {expected_length}, got {list(unique_lengths)[0]}") - - # Check for NaN/inf values - arr = np.array(profiles, dtype=np.float32) - if np.any(np.isnan(arr)): - issues.append("NaN values detected in profiles") - if np.any(np.isinf(arr)): - issues.append("Infinite values detected in profiles") - - # Check value ranges - if np.any(arr < 0): - warnings.append(f"Negative values detected: min={arr.min():.2f}") - - if arr.max() > 10000: - warnings.append(f"Very high values detected: max={arr.max():.2f}") - - return { - "status": "PASS" if not issues else "FAIL", - "issues": issues, - "warnings": warnings, - "stats": { - "n_profiles": len(profiles), - "profile_length": list(unique_lengths)[0] if len(unique_lengths) == 1 else None, - "min_value": float(arr.min()), - "max_value": float(arr.max()), - "mean_value": float(arr.mean()), - }, - } diff --git a/analysis/clustering/dtw_clustering.py b/analysis/clustering/dtw_clustering.py deleted file mode 100644 index fcc9ba2..0000000 --- a/analysis/clustering/dtw_clustering.py +++ /dev/null @@ -1,661 +0,0 @@ -""" -Phase 2: DTW K-Means Clustering for Load Profile Analysis. - -Clusters daily electricity usage profiles using Dynamic Time Warping (DTW) -distance metric to identify consumption patterns. - -Performance Optimization: - - K evaluation uses a subsample (default 2000 profiles) for speed - - Final clustering runs on full dataset with optimal k - - Configurable max_iter and n_init for speed vs quality tradeoff - -Pipeline: - 1. Load daily profiles from Phase 1 - 2. Normalize profiles (optional) - 3. Evaluate k values on subsample to find optimal k - 4. Run final clustering on full dataset with optimal k - 5. Output assignments, centroids, and visualizations - -Usage: - # Standard run (evaluates k=3-6, uses subsample for evaluation) - python dtw_clustering.py \\ - --input data/clustering/sampled_profiles.parquet \\ - --output-dir data/clustering/results \\ - --k-range 3 6 \\ - --find-optimal-k \\ - --normalize - - # Fast validation run - python dtw_clustering.py \\ - --input data/clustering/sampled_profiles.parquet \\ - --output-dir data/clustering/results \\ - --k-range 3 4 \\ - --max-eval-samples 1000 \\ - --eval-max-iter 5 -""" - -from __future__ import annotations - -import argparse -import json -import logging -from pathlib import Path - -import matplotlib.pyplot as plt -import numpy as np -import polars as pl - -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(levelname)s - %(message)s", -) -logger = logging.getLogger(__name__) - - -def load_profiles(path: Path) -> tuple[np.ndarray, pl.DataFrame]: - """ - Load profiles from parquet file. - - Args: - path: Path to sampled_profiles.parquet - - Returns: - Tuple of (profile_array, metadata_df) - """ - logger.info(f"Loading profiles from {path}") - - df = pl.read_parquet(path) - - # Extract profiles as numpy array - profiles = np.array(df["profile"].to_list(), dtype=np.float64) - - logger.info(f" Loaded {len(profiles):,} profiles with {profiles.shape[1]} time points each") - logger.info(f" Data shape: {profiles.shape}") - logger.info(f" Data range: [{profiles.min():.2f}, {profiles.max():.2f}]") - - return profiles, df - - -def normalize_profiles( - X: np.ndarray, - method: str = "zscore", -) -> np.ndarray: - """ - Normalize profiles for clustering. - - Args: - X: Profile array of shape (n_samples, n_timepoints) - method: Normalization method ('zscore', 'minmax', 'none') - - Returns: - Normalized array - """ - if method == "none": - return X - - logger.info(f"Normalizing profiles using {method} method...") - - if method == "zscore": - # Per-profile z-score normalization - means = X.mean(axis=1, keepdims=True) - stds = X.std(axis=1, keepdims=True) - stds[stds == 0] = 1 # Avoid division by zero - X_norm = (X - means) / stds - elif method == "minmax": - # Per-profile min-max normalization - mins = X.min(axis=1, keepdims=True) - maxs = X.max(axis=1, keepdims=True) - ranges = maxs - mins - ranges[ranges == 0] = 1 - X_norm = (X - mins) / ranges - else: - raise ValueError(f"Unknown normalization method: {method}") - - logger.info(f" Normalized data range: [{X_norm.min():.2f}, {X_norm.max():.2f}]") - - return X_norm - - -def evaluate_clustering( - X: np.ndarray, - k_range: range, - max_iter: int = 10, - n_init: int = 3, - random_state: int = 42, - max_eval_samples: int = 2000, -) -> dict: - """ - Evaluate clustering for different values of k using a subsample. - - Uses a random subsample for evaluation to reduce runtime while still - providing reliable estimates of optimal k. - - Args: - X: Profile array of shape (n_samples, n_timepoints) - k_range: Range of k values to test - max_iter: Maximum iterations per k-means run - n_init: Number of random initializations - random_state: Random seed for reproducibility - max_eval_samples: Maximum profiles to use for evaluation - - Returns: - Dictionary with k_values, inertia, and silhouette scores - """ - from sklearn.metrics import silhouette_score - from tslearn.clustering import TimeSeriesKMeans - - logger.info(f"Evaluating clustering for k in {list(k_range)}...") - - # Subsample for evaluation if dataset is large - if X.shape[0] > max_eval_samples: - rng = np.random.default_rng(random_state) - idx = rng.choice(X.shape[0], size=max_eval_samples, replace=False) - X_eval = X[idx] - logger.info(f" Using subsample of {max_eval_samples:,} profiles for k evaluation") - logger.info(f" (Full dataset: {X.shape[0]:,} profiles will be used for final clustering)") - else: - X_eval = X - logger.info(f" Using all {X_eval.shape[0]:,} profiles for evaluation") - - results = { - "k_values": [], - "inertia": [], - "silhouette": [], - } - - # Reshape for tslearn (n_samples, n_timepoints, n_features) - X_reshaped = X_eval.reshape(X_eval.shape[0], X_eval.shape[1], 1) - - for k in k_range: - logger.info(f"\n Testing k={k}...") - - model = TimeSeriesKMeans( - n_clusters=k, - metric="dtw", - max_iter=max_iter, - n_init=n_init, - random_state=random_state, - n_jobs=-1, - verbose=0, - ) - - labels = model.fit_predict(X_reshaped) - - # Use Euclidean distance for silhouette (faster, still informative) - sil_score = silhouette_score(X_eval, labels, metric="euclidean") - - results["k_values"].append(k) - results["inertia"].append(float(model.inertia_)) - results["silhouette"].append(float(sil_score)) - - logger.info(f" Inertia: {model.inertia_:.2f}") - logger.info(f" Silhouette: {sil_score:.3f}") - - return results - - -def find_optimal_k(eval_results: dict) -> int: - """ - Find optimal k based on silhouette score. - - Args: - eval_results: Results from evaluate_clustering - - Returns: - Optimal k value - """ - k_values = eval_results["k_values"] - silhouettes = eval_results["silhouette"] - - best_idx = np.argmax(silhouettes) - best_k = k_values[best_idx] - - logger.info(f"\nOptimal k={best_k} (silhouette={silhouettes[best_idx]:.3f})") - - return best_k - - -def run_final_clustering( - X: np.ndarray, - k: int, - max_iter: int = 10, - n_init: int = 3, - random_state: int = 42, -) -> tuple[np.ndarray, np.ndarray, float]: - """ - Run final clustering on full dataset with chosen k. - - Args: - X: Full profile array - k: Number of clusters - max_iter: Maximum iterations - n_init: Number of random initializations - random_state: Random seed - - Returns: - Tuple of (labels, centroids, inertia) - """ - from tslearn.clustering import TimeSeriesKMeans - - logger.info(f"\nRunning final clustering with k={k} on {X.shape[0]:,} profiles...") - - X_reshaped = X.reshape(X.shape[0], X.shape[1], 1) - - model = TimeSeriesKMeans( - n_clusters=k, - metric="dtw", - max_iter=max_iter, - n_init=n_init, - random_state=random_state, - n_jobs=-1, - verbose=1, - ) - - labels = model.fit_predict(X_reshaped) - centroids = model.cluster_centers_.squeeze() # Remove extra dimension - - logger.info(f" Final inertia: {model.inertia_:.2f}") - - # Log cluster distribution - unique, counts = np.unique(labels, return_counts=True) - for cluster, count in zip(unique, counts): - pct = count / len(labels) * 100 - logger.info(f" Cluster {cluster}: {count:,} profiles ({pct:.1f}%)") - - return labels, centroids, float(model.inertia_) - - -def plot_centroids( - centroids: np.ndarray, - output_path: Path, - title: str = "Cluster Centroids (Average Load Profiles)", -) -> None: - """ - Plot cluster centroids showing typical daily patterns. - - Args: - centroids: Centroid array of shape (k, n_timepoints) - output_path: Path to save plot - title: Plot title - """ - fig, ax = plt.subplots(figsize=(12, 6)) - - hours = np.arange(0, 24, 0.5) # 48 half-hour intervals - - colors = plt.cm.tab10(np.linspace(0, 1, len(centroids))) - - for i, (centroid, color) in enumerate(zip(centroids, colors)): - ax.plot(hours, centroid, label=f"Cluster {i}", color=color, linewidth=2) - - ax.set_xlabel("Hour of Day", fontsize=12) - ax.set_ylabel("Normalized Energy Usage", fontsize=12) - ax.set_title(title, fontsize=14) - ax.legend(loc="upper right") - ax.set_xticks(range(0, 25, 3)) - ax.grid(True, alpha=0.3) - - plt.tight_layout() - plt.savefig(output_path, dpi=150) - plt.close() - - logger.info(f" Saved centroid plot: {output_path}") - - -def plot_cluster_samples( - X: np.ndarray, - labels: np.ndarray, - centroids: np.ndarray, - output_path: Path, - n_samples: int = 50, -) -> None: - """ - Plot sample profiles from each cluster with centroid overlay. - - Args: - X: Profile array - labels: Cluster assignments - centroids: Cluster centroids - output_path: Path to save plot - n_samples: Number of sample profiles per cluster - """ - k = len(centroids) - fig, axes = plt.subplots(1, k, figsize=(5 * k, 4), sharey=True) - - if k == 1: - axes = [axes] - - hours = np.arange(0, 24, 0.5) - - for i, ax in enumerate(axes): - cluster_mask = labels == i - cluster_profiles = X[cluster_mask] - - # Sample profiles to plot - n_to_plot = min(n_samples, len(cluster_profiles)) - if n_to_plot > 0: - idx = np.random.choice(len(cluster_profiles), n_to_plot, replace=False) - for profile in cluster_profiles[idx]: - ax.plot(hours, profile, alpha=0.2, color="gray", linewidth=0.5) - - # Plot centroid - ax.plot(hours, centroids[i], color="red", linewidth=2, label="Centroid") - - ax.set_title(f"Cluster {i} (n={cluster_mask.sum():,})") - ax.set_xlabel("Hour of Day") - if i == 0: - ax.set_ylabel("Normalized Usage") - ax.set_xticks(range(0, 25, 6)) - ax.grid(True, alpha=0.3) - ax.legend(loc="upper right") - - plt.tight_layout() - plt.savefig(output_path, dpi=150) - plt.close() - - logger.info(f" Saved cluster samples plot: {output_path}") - - -def plot_elbow_curve( - eval_results: dict, - output_path: Path, -) -> None: - """ - Plot elbow curve showing inertia and silhouette vs k. - - Args: - eval_results: Results from evaluate_clustering - output_path: Path to save plot - """ - fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4)) - - k_values = eval_results["k_values"] - - # Inertia plot - ax1.plot(k_values, eval_results["inertia"], "bo-", linewidth=2, markersize=8) - ax1.set_xlabel("Number of Clusters (k)") - ax1.set_ylabel("Inertia") - ax1.set_title("Elbow Curve") - ax1.grid(True, alpha=0.3) - - # Silhouette plot - ax2.plot(k_values, eval_results["silhouette"], "go-", linewidth=2, markersize=8) - ax2.set_xlabel("Number of Clusters (k)") - ax2.set_ylabel("Silhouette Score") - ax2.set_title("Silhouette Score vs k") - ax2.grid(True, alpha=0.3) - - # Mark best k - best_idx = np.argmax(eval_results["silhouette"]) - ax2.axvline(x=k_values[best_idx], color="red", linestyle="--", alpha=0.7) - ax2.annotate( - f"Best k={k_values[best_idx]}", - xy=(k_values[best_idx], eval_results["silhouette"][best_idx]), - xytext=(10, 10), - textcoords="offset points", - fontsize=10, - color="red", - ) - - plt.tight_layout() - plt.savefig(output_path, dpi=150) - plt.close() - - logger.info(f" Saved elbow curve: {output_path}") - - -def save_results( - df: pl.DataFrame, - labels: np.ndarray, - centroids: np.ndarray, - eval_results: dict | None, - metadata: dict, - output_dir: Path, -) -> None: - """ - Save all clustering results to output directory. - - Args: - df: Original profile DataFrame with metadata - labels: Cluster assignments - centroids: Cluster centroids - eval_results: K evaluation results (if any) - metadata: Clustering metadata - output_dir: Output directory - """ - output_dir.mkdir(parents=True, exist_ok=True) - - # Save cluster assignments - assignments = df.select(["zip_code", "date", "is_weekend", "weekday"]).with_columns(pl.Series("cluster", labels)) - assignments_path = output_dir / "cluster_assignments.parquet" - assignments.write_parquet(assignments_path) - logger.info(f" Saved assignments: {assignments_path}") - - # Save centroids as parquet - centroids_df = pl.DataFrame({ - "cluster": list(range(len(centroids))), - "centroid": [c.tolist() for c in centroids], - }) - centroids_path = output_dir / "cluster_centroids.parquet" - centroids_df.write_parquet(centroids_path) - logger.info(f" Saved centroids: {centroids_path}") - - # Save k evaluation results - if eval_results: - eval_path = output_dir / "k_evaluation.json" - with open(eval_path, "w") as f: - json.dump(eval_results, f, indent=2) - logger.info(f" Saved k evaluation: {eval_path}") - - # Save metadata - metadata_path = output_dir / "clustering_metadata.json" - with open(metadata_path, "w") as f: - json.dump(metadata, f, indent=2) - logger.info(f" Saved metadata: {metadata_path}") - - -def main() -> None: - parser = argparse.ArgumentParser( - description="DTW K-Means Clustering for Load Profiles", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - # Standard run with k evaluation - python dtw_clustering.py \\ - --input data/clustering/sampled_profiles.parquet \\ - --output-dir data/clustering/results \\ - --k-range 3 6 --find-optimal-k --normalize - - # Fast run for testing - python dtw_clustering.py \\ - --input data/clustering/sampled_profiles.parquet \\ - --output-dir data/clustering/results \\ - --k-range 3 4 --max-eval-samples 1000 --eval-max-iter 5 - - # Fixed k (no evaluation) - python dtw_clustering.py \\ - --input data/clustering/sampled_profiles.parquet \\ - --output-dir data/clustering/results \\ - --k 4 --normalize - """, - ) - - parser.add_argument( - "--input", - type=Path, - required=True, - help="Path to sampled_profiles.parquet", - ) - parser.add_argument( - "--output-dir", - type=Path, - default=Path("data/clustering/results"), - help="Output directory for results", - ) - - # K selection - k_group = parser.add_argument_group("Cluster Selection") - k_group.add_argument( - "--k", - type=int, - default=None, - help="Fixed number of clusters (skip evaluation)", - ) - k_group.add_argument( - "--k-range", - type=int, - nargs=2, - metavar=("MIN", "MAX"), - default=[3, 6], - help="Range of k values to evaluate (default: 3 6)", - ) - k_group.add_argument( - "--find-optimal-k", - action="store_true", - help="Evaluate k range and use optimal k", - ) - - # Performance tuning - perf_group = parser.add_argument_group("Performance Tuning") - perf_group.add_argument( - "--max-eval-samples", - type=int, - default=2000, - help="Max profiles for k evaluation subsample (default: 2000)", - ) - perf_group.add_argument( - "--eval-max-iter", - type=int, - default=10, - help="Max iterations for k evaluation runs (default: 10)", - ) - perf_group.add_argument( - "--eval-n-init", - type=int, - default=3, - help="Number of initializations for k evaluation (default: 3)", - ) - perf_group.add_argument( - "--final-max-iter", - type=int, - default=10, - help="Max iterations for final clustering (default: 10)", - ) - perf_group.add_argument( - "--final-n-init", - type=int, - default=3, - help="Number of initializations for final clustering (default: 3)", - ) - - # Preprocessing - parser.add_argument( - "--normalize", - action="store_true", - help="Apply z-score normalization to profiles", - ) - parser.add_argument( - "--normalize-method", - choices=["zscore", "minmax", "none"], - default="zscore", - help="Normalization method (default: zscore)", - ) - - parser.add_argument( - "--random-state", - type=int, - default=42, - help="Random seed for reproducibility", - ) - - args = parser.parse_args() - - print("=" * 70) - print("PHASE 2: DTW K-MEANS CLUSTERING") - print("=" * 70) - - # Load profiles - X, df = load_profiles(args.input) - - # Normalize if requested - if args.normalize: - X = normalize_profiles(X, method=args.normalize_method) - - # Determine k - eval_results = None - - if args.k is not None: - # Fixed k - k = args.k - logger.info(f"\nUsing fixed k={k}") - elif args.find_optimal_k: - # Evaluate k range on subsample - k_range = range(args.k_range[0], args.k_range[1] + 1) - - eval_results = evaluate_clustering( - X, - k_range=k_range, - max_iter=args.eval_max_iter, - n_init=args.eval_n_init, - random_state=args.random_state, - max_eval_samples=args.max_eval_samples, - ) - - # Save elbow curve - args.output_dir.mkdir(parents=True, exist_ok=True) - plot_elbow_curve(eval_results, args.output_dir / "elbow_curve.png") - - k = find_optimal_k(eval_results) - else: - # Default to min of k_range - k = args.k_range[0] - logger.info(f"\nUsing default k={k}") - - # Run final clustering on full dataset - labels, centroids, inertia = run_final_clustering( - X, - k=k, - max_iter=args.final_max_iter, - n_init=args.final_n_init, - random_state=args.random_state, - ) - - # Create visualizations - logger.info("\nGenerating visualizations...") - args.output_dir.mkdir(parents=True, exist_ok=True) - - plot_centroids(centroids, args.output_dir / "cluster_centroids.png") - plot_cluster_samples(X, labels, centroids, args.output_dir / "cluster_samples.png") - - # Save results - logger.info("\nSaving results...") - - metadata = { - "k": k, - "n_profiles": len(X), - "n_timepoints": X.shape[1], - "normalized": args.normalize, - "normalize_method": args.normalize_method if args.normalize else None, - "max_iter": args.final_max_iter, - "n_init": args.final_n_init, - "random_state": args.random_state, - "inertia": inertia, - "eval_max_samples": args.max_eval_samples if args.find_optimal_k else None, - } - - save_results(df, labels, centroids, eval_results, metadata, args.output_dir) - - # Summary - print("\n" + "=" * 70) - print("CLUSTERING COMPLETE") - print("=" * 70) - print(f"\nResults saved to: {args.output_dir}") - print(f" • {len(X):,} profiles clustered into {k} groups") - print(f" • Inertia: {inertia:.2f}") - if eval_results: - best_sil = max(eval_results["silhouette"]) - print(f" • Best silhouette score: {best_sil:.3f}") - print("=" * 70) - - -if __name__ == "__main__": - main() diff --git a/analysis/clustering/euclidean_clustering.py b/analysis/clustering/euclidean_clustering.py deleted file mode 100644 index 9ce1e2a..0000000 --- a/analysis/clustering/euclidean_clustering.py +++ /dev/null @@ -1,823 +0,0 @@ -#!/usr/bin/env python3 -""" -Phase 2: K-Means Clustering for Load Profile Analysis. - -Clusters daily electricity usage profiles using standard Euclidean distance -to identify distinct consumption patterns. - -Pipeline: - 1. Load daily profiles from Phase 1 - 2. Normalize profiles (optional) - 3. Evaluate k values to find optimal k (via silhouette score on a sample) - 4. Run final clustering with optimal k - 5. Output assignments, centroids, and visualizations - -Usage: - # Standard run (evaluates k=3-6 using silhouette on up to 10k samples) - python euclidean_clustering_fixed.py \\ - --input data/clustering/sampled_profiles.parquet \\ - --output-dir data/clustering/results \\ - --k-range 3 6 \\ - --find-optimal-k \\ - --normalize \\ - --silhouette-sample-size 10000 - - # Fixed k (no evaluation) - python euclidean_clustering.py \\ - --input data/clustering/sampled_profiles.parquet \\ - --output-dir data/clustering/results \\ - --k 4 --normalize -""" - -from __future__ import annotations - -import argparse -import json -import logging -from pathlib import Path - -import matplotlib.pyplot as plt -import numpy as np -import polars as pl -from sklearn.cluster import KMeans -from sklearn.metrics import silhouette_score - -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(levelname)s - %(message)s", -) -logger = logging.getLogger(__name__) - -DEFAULT_NORMALIZATION: str = "minmax" -DEFAULT_NORMALIZE: bool = True - - -def load_profiles(path: Path) -> tuple[np.ndarray, pl.DataFrame]: - """ - Load profiles from parquet file. - - Args: - path: Path to sampled_profiles.parquet - - Returns: - Tuple of (profile_array, metadata_df) - """ - logger.info(f"Loading profiles from {path}") - - df = pl.read_parquet(path) - - # Extract profiles as numpy array - profiles = np.array(df["profile"].to_list(), dtype=np.float64) - - logger.info( - " Loaded %s profiles with %s time points each", - f"{len(profiles):,}", - profiles.shape[1], - ) - logger.info(" Data shape: %s", (profiles.shape[0], profiles.shape[1])) - logger.info(" Data range: [%.2f, %.2f]", profiles.min(), profiles.max()) - - return profiles, df - - -def normalize_profiles( - df: pl.DataFrame, - method: str = "minmax", - profile_col: str = "profile", - out_col: str | None = None, -) -> pl.DataFrame: - """ - Normalize per-household-day profiles for clustering. - - Parameters - ---------- - df : pl.DataFrame - Must contain a list column with the profile values (e.g. 48-dim vector). - method : {"none", "zscore", "minmax"} - - "none": return df unchanged - - "zscore": per-profile z-score: (x - mean) / std - - "minmax": per-profile min-max: (x - min) / (max - min) - profile_col : str - Name of the list column holding the raw profile. - out_col : str | None - If provided, write normalized profile to this column; otherwise overwrite - `profile_col` in-place. - - Notes - ----- - - Normalization is done per profile (per row), not globally. - - For degenerate profiles where max == min, we fall back to all zeros. - """ - - if method == "none": - return df - - if profile_col not in df.columns: - raise ValueError(f"normalize_profiles: column '{profile_col}' not found in DataFrame") - - target_col = out_col or profile_col - - expr = pl.col(profile_col) - - if method == "zscore": - mean_expr = expr.list.mean() - std_expr = expr.list.std(ddof=0) - - normalized = (expr - mean_expr) / std_expr - - # If std == 0 (flat profile), fall back to zeros - normalized = pl.when(std_expr != 0).then(normalized).otherwise(expr * 0.0) - - elif method == "minmax": - min_expr = expr.list.min() - max_expr = expr.list.max() - range_expr = max_expr - min_expr - - normalized = (expr - min_expr) / range_expr - - # If range == 0 (flat profile), fall back to zeros - normalized = pl.when(range_expr != 0).then(normalized).otherwise(expr * 0.0) - - else: - raise ValueError(f"Unknown normalization method: {method!r}") - - return df.with_columns(normalized.alias(target_col)) - - -def evaluate_clustering( - X: np.ndarray, - k_range: range, - n_init: int = 10, - random_state: int = 42, - silhouette_sample_size: int | None = 10_000, -) -> dict: - """ - Evaluate clustering for different values of k. - - Uses inertia on the full dataset and silhouette score computed on a - subsample (to avoid O(n^2) cost when n is large). - - Args: - X: Profile array of shape (n_samples, n_timepoints) - k_range: Range of k values to test - n_init: Number of random initializations - random_state: Random seed for reproducibility - silhouette_sample_size: Max number of samples for silhouette. - If None, uses full dataset (NOT recommended for very large n). - - Returns: - Dictionary with k_values, inertia, and silhouette scores - """ - n_samples = X.shape[0] - logger.info("Evaluating clustering for k in %s...", list(k_range)) - logger.info(" Dataset size: %s profiles", f"{n_samples:,}") - - if silhouette_sample_size is None: - logger.info(" Silhouette: using FULL dataset (may be very slow).") - elif n_samples > silhouette_sample_size: - logger.info( - " Silhouette: using a random subsample of %s profiles.", - f"{silhouette_sample_size:,}", - ) - else: - logger.info( - " Silhouette: using all %s profiles (<= sample size).", - f"{n_samples:,}", - ) - - results = { - "k_values": [], - "inertia": [], - "silhouette": [], - } - - for k in k_range: - logger.info("") - logger.info(" Testing k=%d...", k) - - model = KMeans( - n_clusters=k, - n_init=n_init, - random_state=random_state, - ) - - labels = model.fit_predict(X) - - # Inertia on full data - inertia = float(model.inertia_) - - # Silhouette on sample (or full data if silhouette_sample_size is None) - sil_kwargs: dict = {"metric": "euclidean"} - if silhouette_sample_size is not None and n_samples > silhouette_sample_size: - sil_kwargs["sample_size"] = silhouette_sample_size - sil_kwargs["random_state"] = random_state - - sil_score = silhouette_score(X, labels, **sil_kwargs) - - results["k_values"].append(k) - results["inertia"].append(inertia) - results["silhouette"].append(float(sil_score)) - - logger.info(" Inertia: %s", f"{inertia:,.2f}") - logger.info(" Silhouette: %.3f", sil_score) - - return results - - -def find_optimal_k(eval_results: dict) -> int: - """ - Find optimal k based on silhouette score. - - Args: - eval_results: Results from evaluate_clustering - - Returns: - Optimal k value - """ - k_values = eval_results["k_values"] - silhouettes = eval_results["silhouette"] - - best_idx = int(np.argmax(silhouettes)) - best_k = int(k_values[best_idx]) - - logger.info("") - logger.info( - "Optimal k=%d (silhouette=%.3f)", - best_k, - silhouettes[best_idx], - ) - - return best_k - - -def run_clustering( - X: np.ndarray, - k: int, - n_init: int = 10, - random_state: int = 42, -) -> tuple[np.ndarray, np.ndarray, float]: - """ - Run k-means clustering. - - Args: - X: Profile array - k: Number of clusters - n_init: Number of random initializations - random_state: Random seed - - Returns: - Tuple of (labels, centroids, inertia) - """ - logger.info("") - logger.info( - "Running k-means with k=%d on %s profiles...", - k, - f"{X.shape[0]:,}", - ) - - model = KMeans( - n_clusters=k, - n_init=n_init, - random_state=random_state, - ) - - labels = model.fit_predict(X) - centroids = model.cluster_centers_ - inertia = float(model.inertia_) - - logger.info(" Inertia: %s", f"{inertia:,.2f}") - - # Log cluster distribution - unique, counts = np.unique(labels, return_counts=True) - for cluster, count in zip(unique, counts): - pct = count / len(labels) * 100 - logger.info( - " Cluster %d: %s profiles (%.1f%%)", - cluster, - f"{count:,}", - pct, - ) - - return labels, centroids, inertia - - -def plot_centroids( - centroids: np.ndarray, - output_path: Path, -) -> None: - """ - Plot cluster centroids (average load profiles). - - Args: - centroids: Array of shape (k, n_timepoints) - output_path: Path to save plot - """ - k = len(centroids) - n_timepoints = centroids.shape[1] - - # Create hour labels (assuming 48 half-hourly intervals) - if n_timepoints == 48: - hours = np.arange(0.5, 24.5, 0.5) - xlabel = "Hour of Day" - elif n_timepoints == 24: - hours = np.arange(1, 25) - xlabel = "Hour of Day" - else: - hours = np.arange(n_timepoints) - xlabel = "Time Interval" - - fig, ax = plt.subplots(figsize=(12, 6)) - - colors = plt.cm.tab10(np.linspace(0, 1, k)) - - for i, (centroid, color) in enumerate(zip(centroids, colors)): - ax.plot(hours, centroid, label=f"Cluster {i}", color=color, linewidth=2) - - ax.set_xlabel(xlabel, fontsize=12) - ax.set_ylabel("Normalized Usage", fontsize=12) - ax.set_title("Cluster Centroids (Average Load Profiles)", fontsize=14) - ax.legend(loc="upper right") - ax.grid(True, alpha=0.3) - - if n_timepoints == 48: - ax.set_xticks(range(0, 25, 4)) - ax.set_xlim(0, 24) - - plt.tight_layout() - plt.savefig(output_path, dpi=150) - plt.close() - - logger.info(" Saved centroids plot: %s", output_path) - - -def plot_cluster_samples( - X: np.ndarray, - labels: np.ndarray, - centroids: np.ndarray, - output_path: Path, - n_samples: int = 50, - random_state: int = 42, -) -> None: - """ - Plot sample profiles from each cluster with centroid overlay. - - Args: - X: Profile array - labels: Cluster assignments - centroids: Cluster centroids - output_path: Path to save plot - n_samples: Number of sample profiles per cluster - random_state: Random seed - """ - k = len(centroids) - n_timepoints = X.shape[1] - - # Create hour labels - if n_timepoints == 48: - hours = np.arange(0.5, 24.5, 0.5) - elif n_timepoints == 24: - hours = np.arange(1, 25) - else: - hours = np.arange(n_timepoints) - - fig, axes = plt.subplots(1, k, figsize=(5 * k, 4), sharey=True) - if k == 1: - axes = [axes] - - rng = np.random.default_rng(random_state) - colors = plt.cm.tab10(np.linspace(0, 1, k)) - - for i, (ax, color) in enumerate(zip(axes, colors)): - cluster_mask = labels == i - cluster_profiles = X[cluster_mask] - - n_available = len(cluster_profiles) - if n_available == 0: - ax.set_title(f"Cluster {i} (n=0)") - ax.grid(True, alpha=0.3) - continue - - n_plot = min(n_samples, n_available) - idx = rng.choice(n_available, size=n_plot, replace=False) - - # Plot samples with transparency - for profile in cluster_profiles[idx]: - ax.plot(hours, profile, color=color, alpha=0.1, linewidth=0.5) - - # Plot centroid - ax.plot(hours, centroids[i], color="black", linewidth=2, label="Centroid") - - ax.set_title(f"Cluster {i} (n={n_available:,})") - ax.set_xlabel("Hour") - if i == 0: - ax.set_ylabel("Normalized Usage") - ax.grid(True, alpha=0.3) - - if n_timepoints == 48: - ax.set_xticks(range(0, 25, 6)) - ax.set_xlim(0, 24) - - plt.tight_layout() - plt.savefig(output_path, dpi=150) - plt.close() - - logger.info(" Saved cluster samples plot: %s", output_path) - - -def plot_elbow_curve( - eval_results: dict, - output_path: Path, -) -> None: - """ - Plot elbow curve (inertia and silhouette vs k). - - Args: - eval_results: Results from evaluate_clustering - output_path: Path to save plot - """ - k_values = eval_results["k_values"] - inertia = eval_results["inertia"] - silhouette = eval_results["silhouette"] - - fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4)) - - # Inertia (elbow curve) - ax1.plot(k_values, inertia, "b-o", linewidth=2, markersize=8) - ax1.set_xlabel("Number of Clusters (k)", fontsize=12) - ax1.set_ylabel("Inertia", fontsize=12) - ax1.set_title("Elbow Curve", fontsize=14) - ax1.grid(True, alpha=0.3) - ax1.set_xticks(k_values) - - # Silhouette score - ax2.plot(k_values, silhouette, "g-o", linewidth=2, markersize=8) - ax2.set_xlabel("Number of Clusters (k)", fontsize=12) - ax2.set_ylabel("Silhouette Score", fontsize=12) - ax2.set_title("Silhouette Score", fontsize=14) - ax2.grid(True, alpha=0.3) - ax2.set_xticks(k_values) - - # Mark optimal k - best_idx = int(np.argmax(silhouette)) - ax2.axvline(x=k_values[best_idx], color="red", linestyle="--", alpha=0.7) - ax2.scatter( - [k_values[best_idx]], - [silhouette[best_idx]], - s=200, - facecolors="none", - edgecolors="red", - linewidths=2, - zorder=5, - ) - - plt.tight_layout() - plt.savefig(output_path, dpi=150) - plt.close() - - logger.info(" Saved elbow curve: %s", output_path) - - -def analyze_weekday_weekend_distribution( - df: pl.DataFrame, - labels: np.ndarray, -) -> dict: - """ - Analyze weekday vs weekend distribution across clusters. - - This diagnostic checks if certain clusters are dominated by weekdays - or weekends, which would suggest usage patterns are day-type dependent. - - Args: - df: Original profile DataFrame with 'is_weekend' column - labels: Cluster assignments - - Returns: - Dictionary with distribution statistics - """ - if "is_weekend" not in df.columns: - logger.warning(" No 'is_weekend' column found - skipping weekday/weekend analysis") - return {} - - # Add cluster labels to dataframe - df_with_clusters = df.with_columns(pl.Series("cluster", labels)) - - # Calculate distribution - dist = ( - df_with_clusters.group_by(["cluster", "is_weekend"]) - .agg(pl.len().alias("count")) - .sort(["cluster", "is_weekend"]) - ) - - # Calculate percentages per cluster - dist = dist.with_columns([(pl.col("count") / pl.col("count").sum().over("cluster") * 100).alias("pct")]) - - # Summary: % weekend by cluster - summary = ( - df_with_clusters.group_by("cluster") - .agg([pl.len().alias("total"), (pl.col("is_weekend").sum() / pl.len() * 100).alias("pct_weekend")]) - .sort("cluster") - ) - - logger.info("") - logger.info("=" * 70) - logger.info("WEEKDAY/WEEKEND DISTRIBUTION BY CLUSTER") - logger.info("=" * 70) - - for row in summary.iter_rows(named=True): - cluster = row["cluster"] - total = row["total"] - pct_weekend = row["pct_weekend"] - pct_weekday = 100 - pct_weekend - - logger.info( - " Cluster %d: %.1f%% weekday, %.1f%% weekend (n=%s)", cluster, pct_weekday, pct_weekend, f"{total:,}" - ) - - # Flag significant imbalances (>70% one type) - if pct_weekend > 70: - logger.warning(" ⚠️ Weekend-dominated cluster") - elif pct_weekday > 70: - logger.warning(" ⚠️ Weekday-dominated cluster") - - # Overall distribution - overall_weekend_pct = float(df_with_clusters["is_weekend"].mean() * 100) - logger.info("") - logger.info(" Overall dataset: %.1f%% weekend, %.1f%% weekday", overall_weekend_pct, 100 - overall_weekend_pct) - - # Chi-square test would go here if needed for formal significance testing - logger.info("=" * 70) - - return { - "cluster_distribution": summary.to_dicts(), - "detailed_distribution": dist.to_dicts(), - "overall_weekend_pct": overall_weekend_pct, - } - - -def save_results( - df: pl.DataFrame, - labels: np.ndarray, - centroids: np.ndarray, - eval_results: dict | None, - metadata: dict, - output_dir: Path, -) -> None: - """ - Save all clustering results to output directory. - - Args: - df: Original profile DataFrame with metadata - labels: Cluster assignments - centroids: Cluster centroids - eval_results: K evaluation results (if any) - metadata: Clustering metadata - output_dir: Output directory - """ - output_dir.mkdir(parents=True, exist_ok=True) - - # Determine which ID columns are present (household vs ZIP+4 level) - id_cols: list[str] = [] - if "account_identifier" in df.columns: - id_cols.append("account_identifier") - if "zip_code" in df.columns: - id_cols.append("zip_code") - id_cols.extend(["date", "is_weekend", "weekday"]) - - # Only include columns that exist - available_cols = [c for c in id_cols if c in df.columns] - - # Save cluster assignments - assignments = df.select(available_cols).with_columns( - pl.Series("cluster", labels), - ) - assignments_path = output_dir / "cluster_assignments.parquet" - assignments.write_parquet(assignments_path) - logger.info(" Saved assignments: %s", assignments_path) - - # Save centroids as parquet - centroids_df = pl.DataFrame({ - "cluster": list(range(len(centroids))), - "centroid": [c.tolist() for c in centroids], - }) - centroids_path = output_dir / "cluster_centroids.parquet" - centroids_df.write_parquet(centroids_path) - logger.info(" Saved centroids: %s", centroids_path) - - # Save k evaluation results - if eval_results: - eval_path = output_dir / "k_evaluation.json" - with open(eval_path, "w") as f: - json.dump(eval_results, f, indent=2) - logger.info(" Saved k evaluation: %s", eval_path) - - # Save metadata - metadata_path = output_dir / "clustering_metadata.json" - with open(metadata_path, "w") as f: - json.dump(metadata, f, indent=2) - logger.info(" Saved metadata: %s", metadata_path) - - -def main() -> None: - parser = argparse.ArgumentParser( - description="K-Means Clustering for Load Profiles (Euclidean Distance)", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - # Standard run with k evaluation (silhouette on sample) - python euclidean_clustering.py \\ - --input data/clustering/sampled_profiles.parquet \\ - --output-dir data/clustering/results \\ - --k-range 3 6 --find-optimal-k --normalize \\ - --silhouette-sample-size 10000 - - # Fixed k (no evaluation) - python euclidean_clustering.py \\ - --input data/clustering/sampled_profiles.parquet \\ - --output-dir data/clustering/results \\ - --k 4 --normalize - """, - ) - - parser.add_argument( - "--input", - type=Path, - required=True, - help="Path to sampled_profiles.parquet", - ) - parser.add_argument( - "--output-dir", - type=Path, - default=Path("data/clustering/results"), - help="Output directory for results", - ) - - # K selection - k_group = parser.add_argument_group("Cluster Selection") - k_group.add_argument( - "--k", - type=int, - default=None, - help="Fixed number of clusters (skip evaluation)", - ) - k_group.add_argument( - "--k-range", - type=int, - nargs=2, - metavar=("MIN", "MAX"), - default=[3, 6], - help="Range of k values to evaluate (default: 3 6)", - ) - k_group.add_argument( - "--find-optimal-k", - action="store_true", - help="Evaluate k range and use optimal k", - ) - k_group.add_argument( - "--silhouette-sample-size", - type=int, - default=10_000, - help=( - "Max number of samples for silhouette evaluation " - "(default: 10000; use -1 to use full dataset, not recommended for large n)." - ), - ) - - # Clustering parameters - parser.add_argument( - "--n-init", - type=int, - default=10, - help="Number of k-means initializations (default: 10)", - ) - - # Preprocessing - parser.add_argument( - "--normalize", - action="store_true", - default=DEFAULT_NORMALIZE, - help="Apply normalization to profiles (default: True)", - ) - parser.add_argument( - "--normalize-method", - choices=["zscore", "minmax", "none"], - default=DEFAULT_NORMALIZATION, - help="Normalization method (default: minmax)", - ) - - parser.add_argument( - "--random-state", - type=int, - default=42, - help="Random seed for reproducibility", - ) - - args = parser.parse_args() - - print("=" * 70) - print("PHASE 2: K-MEANS CLUSTERING (EUCLIDEAN DISTANCE)") - print("=" * 70) - - # Load profiles - X, df = load_profiles(args.input) - - logger.info("Loaded %s sampled profiles", f"{len(df):,}") - - # Normalize if requested - if args.normalize: - logger.info("Normalizing profiles per household-day (method=%s)", args.normalize_method) - df = normalize_profiles( - df, - method=args.normalize_method, # ✅ FIXED: was args.normalization_method - profile_col="profile", - out_col=None, - ) - # ✅ CRITICAL FIX: Re-extract normalized profiles as numpy array - X = np.array(df["profile"].to_list(), dtype=np.float64) - logger.info(" Normalized data range: [%.2f, %.2f]", X.min(), X.max()) - else: - logger.info("Profile normalization disabled (using raw kWh values).") - - # Determine k - eval_results = None - - if args.k is not None: - # Fixed k - k = args.k - logger.info("") - logger.info("Using fixed k=%d", k) - elif args.find_optimal_k: - # Evaluate k range - k_range = range(args.k_range[0], args.k_range[1] + 1) - - silhouette_sample_size: int | None - silhouette_sample_size = None if args.silhouette_sample_size < 0 else args.silhouette_sample_size - - eval_results = evaluate_clustering( - X, - k_range=k_range, - n_init=args.n_init, - random_state=args.random_state, - silhouette_sample_size=silhouette_sample_size, - ) - - # Save elbow curve - args.output_dir.mkdir(parents=True, exist_ok=True) - plot_elbow_curve(eval_results, args.output_dir / "elbow_curve.png") - - k = find_optimal_k(eval_results) - else: - # Default to min of k_range - k = args.k_range[0] - logger.info("") - logger.info("Using default k=%d", k) - - # Run final clustering - labels, centroids, inertia = run_clustering( - X, - k=k, - n_init=args.n_init, - random_state=args.random_state, - ) - - # Create visualizations - logger.info("") - logger.info("Generating visualizations...") - args.output_dir.mkdir(parents=True, exist_ok=True) - - plot_centroids(centroids, args.output_dir / "cluster_centroids.png") - plot_cluster_samples(X, labels, centroids, args.output_dir / "cluster_samples.png") - - # Save results - logger.info("") - logger.info("Saving results...") - - metadata = { - "k": int(k), - "n_profiles": int(X.shape[0]), - "n_timepoints": int(X.shape[1]), - "normalized": bool(args.normalize), - "normalize_method": args.normalize_method if args.normalize else None, - "n_init": int(args.n_init), - "random_state": int(args.random_state), - "inertia": float(inertia), - "distance_metric": "euclidean", - "silhouette_sample_size": (None if args.silhouette_sample_size < 0 else int(args.silhouette_sample_size)), - } - - save_results(df, labels, centroids, eval_results, metadata, args.output_dir) - - # Summary - print("\n" + "=" * 70) - print("CLUSTERING COMPLETE") - print("=" * 70) - print(f"\nResults saved to: {args.output_dir}") - print(f" • {X.shape[0]:,} profiles clustered into {k} groups") - print(f" • Inertia: {inertia:,.2f}") - if eval_results: - best_sil = max(eval_results["silhouette"]) - print(f" • Best silhouette score: {best_sil:.3f}") - print("=" * 70) - - -if __name__ == "__main__": - main() diff --git a/analysis/clustering/stage2_logratio_regression.py b/analysis/clustering/stage2_logratio_regression.py index 5319b1c..bbc73eb 100644 --- a/analysis/clustering/stage2_logratio_regression.py +++ b/analysis/clustering/stage2_logratio_regression.py @@ -1,45 +1,28 @@ #!/usr/bin/env python3 -""" -Stage 2: Block-Group-Level Log-Ratio Regression of Cluster Composition (HOUSEHOLD-DAY UNITS) - -Goal ------ -Model how Census block-group demographics are associated with the *composition* -of household-day observations across load-profile clusters, using log-ratio -regression (ALR / additive log-ratio). - -Unit of Analysis ----------------- -One row per Census BLOCK GROUP (not one row per block_group x cluster). - -Data Flow ---------- -1. Load household-day cluster assignments from Stage 1 (one row per household-day) -2. Join to Census block groups via ZIP+4 → block group crosswalk (1-to-1 enforced) -3. Aggregate to block-group-level cluster composition (wide format) -4. Join block groups to Census demographics -5. Create smoothed proportions and log-ratios vs a baseline cluster: - y_k = log(p_k / p_base) -6. Fit separate WLS regressions for each non-baseline cluster: - y_k ~ demographics with weights = total_obs (household-day count) -7. Fit OLS models (robustness check, unweighted) - -Outputs -------- +"""Stage 2: Block-Group-Level Log-Ratio Regression of Cluster Composition (HOUSEHOLD-DAY UNITS) + +Aggregates household-day cluster assignments to Census block groups and fits +additive log-ratio (ALR) regressions of Laplace-smoothed cluster proportions +relative to a baseline cluster, using WLS with weights = total household-day +observations per block group. + +Regulatory deliverables (written to output_dir): +- regression_results.parquet # long coefficient table (primary) +- regression_diagnostics.json # R² + residual stats + data-loss metrics +- regression_diagnostics.png # diagnostic plots (per outcome) +- interpretation_guide.md # coefficient interpretation guide +- stage2_metadata.json # full provenance (parameters, inputs, sizes) - regression_data_blockgroups_wide.parquet -- regression_results_logratio_blockgroups.json - statsmodels_summaries_wls.txt -- statsmodels_summaries_ols.txt +- statsmodels_summaries_ols.txt (optional) - regression_report_logratio_blockgroups.txt +- stage2_manifest.json (+ predictor lists) via write_stage2_manifest -Usage ------ - python stage2_logratio_regression.py \ - --clusters data/clustering/results/cluster_assignments.parquet \ - --crosswalk data/reference/2023_comed_zip4_census_crosswalk.txt \ - --census-cache data/reference/census_17_2023.parquet \ - --output-dir data/clustering/results/stage2_blockgroups_logratio \ - --baseline-cluster 1 +Design: +- Household-day inputs processed lazily; only block-group aggregates collected. +- Laplace smoothing alpha=0.5 default. +- Smoothing denominator uses global K (from full cluster label set), + not K among post-filter clusters. """ from __future__ import annotations @@ -48,764 +31,635 @@ import json import logging import sys +from collections.abc import Iterable +from datetime import datetime, timezone from pathlib import Path +from typing import Any +import matplotlib.pyplot as plt import numpy as np import polars as pl import statsmodels.api as sm from sklearn.preprocessing import StandardScaler +from statsmodels.graphics.gofplots import qqplot +from statsmodels.stats.outliers_influence import OLSInfluence from smart_meter_analysis.census import fetch_census_data from smart_meter_analysis.census_specs import STAGE2_PREDICTORS_47 from smart_meter_analysis.run_manifest import write_stage2_manifest logger = logging.getLogger(__name__) -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(levelname)s - %(message)s", -) +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") -def load_cluster_assignments_household_day(path: Path) -> tuple[pl.DataFrame, dict]: - """ - Load household-day cluster assignments. - - Returns the raw Stage 1 output: one row per (household, day) with cluster label. - - Returns - ------- - df : pl.DataFrame - One row per household-day with columns: - - account_identifier - - zip_code - - date (if present) - - cluster - - dominance_stats : dict - Summary statistics on how consistently households stay in one cluster - (for reporting/interpretation, not used in regression) - """ - logger.info("Loading cluster assignments from %s", path) +# ---------------------------- +# IO + aggregation helpers +# ---------------------------- - # For large files, compute stats without loading everything - logger.info(" Computing statistics in streaming mode...") - lf = pl.scan_parquet(path) - # Quick validation +def _validate_clusters_schema(lf: pl.LazyFrame, path: Path) -> list[str]: schema = lf.collect_schema() required = ["account_identifier", "zip_code", "cluster"] missing = [c for c in required if c not in schema.names()] if missing: - raise ValueError(f"cluster_assignments missing required columns: {missing}") + raise ValueError(f"Missing required columns in {path}: {missing}") - # Compute basic stats without loading full data - stats_df = lf.select([ - pl.len().alias("n_household_days"), - pl.col("account_identifier").n_unique().alias("n_households"), - pl.col("cluster").n_unique().alias("n_clusters"), - ]).collect() + cluster_type = schema.get("cluster") + if cluster_type not in (pl.Int32, pl.Int64, pl.Utf8, pl.String): + raise ValueError(f"cluster column must be integer or string, got {cluster_type}") - stats = stats_df.to_dicts()[0] + return required - logger.info( - " Found: %s household-day observations, %s households, %s clusters", - f"{stats['n_household_days']:,}", - f"{stats['n_households']:,}", - stats["n_clusters"], - ) - - # Compute dominance stats with streaming - logger.info(" Computing dominance statistics...") - dominance_stats = _compute_dominance_stats_streaming(path) - logger.info( - " Dominance stats: mean=%.1f%%, median=%.1f%%, >50%%: %.1f%% of households", - dominance_stats["dominance_mean"] * 100, - dominance_stats["dominance_median"] * 100, - dominance_stats["pct_above_50"], +def _compute_dominance_stats_streaming(path: Path) -> dict[str, Any]: + """Household dominance = share of a household's sampled days assigned to its most common cluster. + Computed streaming (does not materialize the full household-day table). + """ + lf = pl.scan_parquet(path).select(["account_identifier", "cluster"]) + dom = ( + lf.group_by(["account_identifier", "cluster"]) + .agg(pl.len().alias("n")) + .group_by("account_identifier") + .agg( + pl.col("n").sum().alias("total_days"), + pl.col("n").max().alias("max_in_cluster"), + ) + .with_columns((pl.col("max_in_cluster") / pl.col("total_days")).alias("dominance")) + .select( + pl.col("dominance").mean().alias("dominance_mean"), + pl.col("dominance").median().alias("dominance_median"), + (pl.col("dominance") > 0.5).mean().alias("pct_above_50"), + ) + .collect(streaming=True) # type: ignore[call-overload] + .to_dicts()[0] ) + return dom # type: ignore[no-any-return] - # Now load only what we need for regression: just the columns - logger.info(" Loading data for regression (selecting needed columns only)...") - raw = lf.select(required).collect(streaming=True) - return raw, dominance_stats +def load_household_day_clusters(path: Path) -> tuple[pl.LazyFrame, dict[str, Any], dict[str, Any]]: + """Load household-day cluster assignments as a LazyFrame with required columns only. + Returns: + household_days_lf: lazy frame (account_identifier, zip_code, cluster[, date]) + basic_stats: small dict of counts for logging/manifesting + dominance_stats: household-level dominance metrics (streaming) -def _compute_dominance_stats_streaming(path: Path) -> dict: - """ - Compute dominance stats using streaming aggregation to avoid OOM. """ lf = pl.scan_parquet(path) + required = _validate_clusters_schema(lf, path) - # Compute per-household cluster counts - counts = ( - lf.group_by(["account_identifier", "cluster"]).agg(pl.len().alias("days_in_cluster")).collect(streaming=True) + cols = required.copy() + schema_names = lf.collect_schema().names() + if "date" in schema_names: + cols.append("date") + + household_days_lf = lf.select(cols) + + stats = ( + household_days_lf.select( + pl.len().alias("n_household_days"), + pl.col("account_identifier").n_unique().alias("n_households"), + pl.col("cluster").n_unique().alias("n_clusters"), + ) + .collect(streaming=True) # type: ignore[call-overload] + .to_dicts()[0] ) - # Get max days per household - max_days = counts.group_by("account_identifier").agg(pl.col("days_in_cluster").max().alias("max_days_in_cluster")) + dominance_stats = _compute_dominance_stats_streaming(path) - # Get total days per household - totals = counts.group_by("account_identifier").agg(pl.col("days_in_cluster").sum().alias("n_days")) + logger.info( + "Loaded clusters: %s household-days, %s households, %s clusters", + f"{stats['n_household_days']:,}", + f"{stats['n_households']:,}", + stats["n_clusters"], + ) - # Join and compute dominance - dominance = max_days.join(totals, on="account_identifier") - dominance = dominance.with_columns((pl.col("max_days_in_cluster") / pl.col("n_days")).alias("dominance")) + return household_days_lf, stats, dominance_stats - dom_series = dominance["dominance"] - return { - "n_households": int(dominance.height), - "dominance_mean": float(dom_series.mean()), - "dominance_median": float(dom_series.median()), - "dominance_std": float(dom_series.std()), - "dominance_min": float(dom_series.min()), - "dominance_max": float(dom_series.max()), - "pct_above_50": float((dom_series > 0.5).mean() * 100), - "pct_above_67": float((dom_series > 0.67).mean() * 100), - "pct_above_80": float((dom_series > 0.80).mean() * 100), - } +def choose_baseline_cluster(household_days_lf: pl.LazyFrame) -> str: + """Baseline is the most frequent cluster by household-day observations.""" + dist = ( + household_days_lf.group_by("cluster") + .agg(pl.len().alias("n_obs")) + .sort("n_obs", descending=True) + .collect(streaming=True) # type: ignore[call-overload] + ) + if dist.is_empty(): + raise ValueError("No cluster assignments found; cannot choose baseline.") + return str(dist["cluster"][0]) -def load_crosswalk_one_to_one(crosswalk_path: Path, zip_codes: list[str]) -> pl.DataFrame: +def list_cluster_labels(household_days_lf: pl.LazyFrame) -> list[str]: + """Return stable, sorted cluster labels present in the full household-day data. + Used to lock global K for smoothing. """ - Load ZIP+4 → Census block-group crosswalk with deterministic 1-to-1 mapping. + labels = ( + household_days_lf.select(pl.col("cluster").cast(pl.Utf8).alias("cluster")) + .unique() + .collect(streaming=True) # type: ignore[call-overload] + .get_column("cluster") + .to_list() + ) + return sorted([str(x) for x in labels if x is not None]) + + +def load_crosswalk_one_to_one( + crosswalk_path: Path, + *, + zip_codes_lf: pl.LazyFrame | None = None, +) -> tuple[pl.LazyFrame, dict[str, Any]]: + """Load ZIP+4 -> block group crosswalk and enforce deterministic 1-to-1 linkage. - When fan-out exists (ZIP+4 maps to multiple block groups), - choose smallest GEOID per ZIP+4 to avoid double-counting household-day observations. + Crosswalk expected to contain: Zip, Zip4, CensusKey2023 (tab-separated). + Output columns: zip_code, block_group_geoid + + Returns: + mapping_lf: 1 row per zip_code with deterministic BG assignment + metrics: fanout + size metrics for provenance - This is the only valid approach when crosswalk weights are unavailable. """ - logger.info("Loading crosswalk from %s", crosswalk_path) + logger.info("Loading crosswalk: %s", crosswalk_path) - crosswalk = ( + lf = ( pl.scan_csv(crosswalk_path, separator="\t", infer_schema_length=10000) .with_columns([ (pl.col("Zip").cast(pl.Utf8).str.zfill(5) + "-" + pl.col("Zip4").cast(pl.Utf8).str.zfill(4)).alias( - "zip_code" + "zip_code", ), pl.col("CensusKey2023").cast(pl.Utf8).str.zfill(15).str.slice(0, 12).alias("block_group_geoid"), ]) - .filter(pl.col("zip_code").is_in(zip_codes)) .select(["zip_code", "block_group_geoid"]) - .collect() - ) - - logger.info( - " Matched %s of %s ZIP+4 codes", - f"{crosswalk['zip_code'].n_unique():,}", - f"{len(set(zip_codes)):,}", + .drop_nulls() ) - if crosswalk.is_empty(): - logger.warning(" Crosswalk is empty after filtering for sample ZIP+4s.") - return crosswalk + if zip_codes_lf is not None: + lf = lf.join(zip_codes_lf, on="zip_code", how="semi") - # Check for fan-out - fanout = crosswalk.group_by("zip_code").agg(pl.n_unique("block_group_geoid").alias("n_block_groups")) - max_fanout = int(fanout["n_block_groups"].max()) + metrics = ( + lf.select( + pl.len().alias("n_rows"), + pl.col("zip_code").n_unique().alias("n_zip4"), + pl.col("block_group_geoid").n_unique().alias("n_bg"), + ) + .collect(streaming=True) # type: ignore[call-overload] + .to_dicts()[0] + ) - if max_fanout > 1: - n_fanout = fanout.filter(pl.col("n_block_groups") > 1).height - pct_fanout = (n_fanout / len(fanout)) * 100 + fanout = ( + lf.group_by("zip_code") + .agg(pl.col("block_group_geoid").n_unique().alias("n_bg")) + .filter(pl.col("n_bg") > 1) + .select(pl.len().alias("n_zip4_multi_bg")) + .collect(streaming=True) # type: ignore[call-overload] + .item() + ) + metrics["n_zip4_multi_bg"] = int(fanout) + if fanout: logger.warning( - " ZIP+4 fan-out detected: %s ZIP+4s (%.1f%%) map to multiple block groups (max=%d per ZIP+4)", - f"{n_fanout:,}", - pct_fanout, - max_fanout, - ) - logger.warning(" Applying deterministic 1-to-1 mapping: selecting smallest GEOID per ZIP+4") - logger.warning(" This prevents double-counting household-day observations") - - # Deterministic resolution: smallest GEOID per ZIP+4 - crosswalk = ( - crosswalk.sort(["zip_code", "block_group_geoid"]) - .group_by("zip_code") - .agg(pl.col("block_group_geoid").first()) + "Crosswalk fan-out: %s ZIP+4s map to multiple block groups; using smallest GEOID per ZIP+4.", + f"{fanout:,}", ) - logger.info( - " After 1-to-1 resolution: %s ZIP+4 codes → %s unique mappings", - f"{len(crosswalk):,}", - f"{len(crosswalk):,}", - ) - else: - logger.info(" Crosswalk is already 1-to-1: each ZIP+4 maps to exactly one block group.") + # Deterministic 1-to-1 mapping: smallest GEOID per ZIP+4 + mapping = lf.group_by("zip_code").agg(pl.col("block_group_geoid").min().alias("block_group_geoid")) + return mapping, metrics - return crosswalk +def attach_block_groups( + household_days_lf: pl.LazyFrame, + crosswalk_lf: pl.LazyFrame, +) -> tuple[pl.LazyFrame, dict[str, Any]]: + """Join ZIP+4 -> block group onto household-day cluster assignments. -def attach_block_groups_to_household_days( - household_days: pl.DataFrame, - crosswalk: pl.DataFrame, -) -> pl.DataFrame: - """ - Attach block-group GEOIDs to household-day observations via ZIP+4. + Returns: + joined_lf: household-day LF with block_group_geoid (nulls dropped) + metrics: join/drop metrics for provenance - Input: one row per household-day - Output: one row per household-day with block_group_geoid attached """ - logger.info("Joining household-day observations to block groups...") - - df = household_days.join(crosswalk, on="zip_code", how="left") + before = household_days_lf.select(pl.len().alias("n")).collect(streaming=True).item() # type: ignore[call-overload] + joined = household_days_lf.join(crosswalk_lf, on="zip_code", how="left") + after_nonnull = ( + joined.select(pl.col("block_group_geoid").is_not_null().sum().alias("n")).collect(streaming=True).item() # type: ignore[call-overload] + ) - n_before = len(df) - missing = df.filter(pl.col("block_group_geoid").is_null()).height + dropped = int(before - after_nonnull) + metrics = { + "household_days_before_crosswalk": int(before), + "household_days_after_crosswalk_nonnull": int(after_nonnull), + "household_days_dropped_missing_crosswalk": dropped, + "pct_dropped_missing_crosswalk": float(dropped / before) if before else 0.0, + } - if missing > 0: - pct = missing / n_before * 100 - logger.warning(" %s (%.1f%%) observations missing block_group - dropping", f"{missing:,}", pct) - df = df.filter(pl.col("block_group_geoid").is_not_null()) + if dropped: + logger.warning( + "Dropped %s household-days (%.2f%%) due to missing ZIP+4 crosswalk mapping.", + f"{dropped:,}", + metrics["pct_dropped_missing_crosswalk"] * 100.0, + ) - logger.info( - " %s household-day observations across %s block groups", - f"{len(df):,}", - f"{df['block_group_geoid'].n_unique():,}", - ) + return joined.drop_nulls("block_group_geoid"), metrics - return df +def aggregate_blockgroup_composition( + household_days_bg_lf: pl.LazyFrame, + *, + cluster_labels: Iterable[str], +) -> pl.DataFrame: + """Aggregate to block-group-level composition. -def aggregate_blockgroup_cluster_composition(df: pl.DataFrame) -> pl.DataFrame: + Returns one row per block group with: + - total_obs (household-day count; WLS weight) + - total_households (unique households; descriptive) + - cluster_