diff --git a/.gitignore b/.gitignore index 110a564..15eb7d1 100644 --- a/.gitignore +++ b/.gitignore @@ -170,3 +170,5 @@ results/ # benchmark artificats profiles/ +docs/*.html +docs/index_files/ diff --git a/analysis/clustering/euclidean_clustering.py b/analysis/clustering/euclidean_clustering.py index 983c14e..9ce1e2a 100644 --- a/analysis/clustering/euclidean_clustering.py +++ b/analysis/clustering/euclidean_clustering.py @@ -1,23 +1,20 @@ +#!/usr/bin/env python3 """ Phase 2: K-Means Clustering for Load Profile Analysis. Clusters daily electricity usage profiles using standard Euclidean distance to identify distinct consumption patterns. -Note: DTW (Dynamic Time Warping) is unnecessary here because all profiles -are aligned to the same 48 half-hourly time grid. Euclidean distance is -appropriate and much faster. - Pipeline: 1. Load daily profiles from Phase 1 - 2. Normalize profiles (optional but recommended) + 2. Normalize profiles (optional) 3. Evaluate k values to find optimal k (via silhouette score on a sample) 4. Run final clustering with optimal k 5. Output assignments, centroids, and visualizations Usage: # Standard run (evaluates k=3-6 using silhouette on up to 10k samples) - python euclidean_clustering.py \\ + python euclidean_clustering_fixed.py \\ --input data/clustering/sampled_profiles.parquet \\ --output-dir data/clustering/results \\ --k-range 3 6 \\ @@ -51,6 +48,9 @@ ) logger = logging.getLogger(__name__) +DEFAULT_NORMALIZATION: str = "minmax" +DEFAULT_NORMALIZE: bool = True + def load_profiles(path: Path) -> tuple[np.ndarray, pl.DataFrame]: """ @@ -81,47 +81,67 @@ def load_profiles(path: Path) -> tuple[np.ndarray, pl.DataFrame]: def normalize_profiles( - X: np.ndarray, - method: str = "zscore", -) -> np.ndarray: + df: pl.DataFrame, + method: str = "minmax", + profile_col: str = "profile", + out_col: str | None = None, +) -> pl.DataFrame: """ - Normalize profiles for clustering. - - Args: - X: Profile array of shape (n_samples, n_timepoints) - method: Normalization method ('zscore', 'minmax', 'none') - - Returns: - Normalized array + Normalize per-household-day profiles for clustering. + + Parameters + ---------- + df : pl.DataFrame + Must contain a list column with the profile values (e.g. 48-dim vector). + method : {"none", "zscore", "minmax"} + - "none": return df unchanged + - "zscore": per-profile z-score: (x - mean) / std + - "minmax": per-profile min-max: (x - min) / (max - min) + profile_col : str + Name of the list column holding the raw profile. + out_col : str | None + If provided, write normalized profile to this column; otherwise overwrite + `profile_col` in-place. + + Notes + ----- + - Normalization is done per profile (per row), not globally. + - For degenerate profiles where max == min, we fall back to all zeros. """ + if method == "none": - return X + return df - logger.info("Normalizing profiles using %s method...", method) + if profile_col not in df.columns: + raise ValueError(f"normalize_profiles: column '{profile_col}' not found in DataFrame") + + target_col = out_col or profile_col + + expr = pl.col(profile_col) if method == "zscore": - # Per-profile z-score normalization - means = X.mean(axis=1, keepdims=True) - stds = X.std(axis=1, keepdims=True) - stds[stds == 0] = 1 # Avoid division by zero - X_norm = (X - means) / stds + mean_expr = expr.list.mean() + std_expr = expr.list.std(ddof=0) + + normalized = (expr - mean_expr) / std_expr + + # If std == 0 (flat profile), fall back to zeros + normalized = pl.when(std_expr != 0).then(normalized).otherwise(expr * 0.0) + elif method == "minmax": - # Per-profile min-max normalization - mins = X.min(axis=1, keepdims=True) - maxs = X.max(axis=1, keepdims=True) - ranges = maxs - mins - ranges[ranges == 0] = 1 - X_norm = (X - mins) / ranges - else: - raise ValueError(f"Unknown normalization method: {method}") + min_expr = expr.list.min() + max_expr = expr.list.max() + range_expr = max_expr - min_expr - logger.info( - " Normalized data range: [%.2f, %.2f]", - X_norm.min(), - X_norm.max(), - ) + normalized = (expr - min_expr) / range_expr + + # If range == 0 (flat profile), fall back to zeros + normalized = pl.when(range_expr != 0).then(normalized).otherwise(expr * 0.0) - return X_norm + else: + raise ValueError(f"Unknown normalization method: {method!r}") + + return df.with_columns(normalized.alias(target_col)) def evaluate_clustering( @@ -457,6 +477,83 @@ def plot_elbow_curve( logger.info(" Saved elbow curve: %s", output_path) +def analyze_weekday_weekend_distribution( + df: pl.DataFrame, + labels: np.ndarray, +) -> dict: + """ + Analyze weekday vs weekend distribution across clusters. + + This diagnostic checks if certain clusters are dominated by weekdays + or weekends, which would suggest usage patterns are day-type dependent. + + Args: + df: Original profile DataFrame with 'is_weekend' column + labels: Cluster assignments + + Returns: + Dictionary with distribution statistics + """ + if "is_weekend" not in df.columns: + logger.warning(" No 'is_weekend' column found - skipping weekday/weekend analysis") + return {} + + # Add cluster labels to dataframe + df_with_clusters = df.with_columns(pl.Series("cluster", labels)) + + # Calculate distribution + dist = ( + df_with_clusters.group_by(["cluster", "is_weekend"]) + .agg(pl.len().alias("count")) + .sort(["cluster", "is_weekend"]) + ) + + # Calculate percentages per cluster + dist = dist.with_columns([(pl.col("count") / pl.col("count").sum().over("cluster") * 100).alias("pct")]) + + # Summary: % weekend by cluster + summary = ( + df_with_clusters.group_by("cluster") + .agg([pl.len().alias("total"), (pl.col("is_weekend").sum() / pl.len() * 100).alias("pct_weekend")]) + .sort("cluster") + ) + + logger.info("") + logger.info("=" * 70) + logger.info("WEEKDAY/WEEKEND DISTRIBUTION BY CLUSTER") + logger.info("=" * 70) + + for row in summary.iter_rows(named=True): + cluster = row["cluster"] + total = row["total"] + pct_weekend = row["pct_weekend"] + pct_weekday = 100 - pct_weekend + + logger.info( + " Cluster %d: %.1f%% weekday, %.1f%% weekend (n=%s)", cluster, pct_weekday, pct_weekend, f"{total:,}" + ) + + # Flag significant imbalances (>70% one type) + if pct_weekend > 70: + logger.warning(" ⚠️ Weekend-dominated cluster") + elif pct_weekday > 70: + logger.warning(" ⚠️ Weekday-dominated cluster") + + # Overall distribution + overall_weekend_pct = float(df_with_clusters["is_weekend"].mean() * 100) + logger.info("") + logger.info(" Overall dataset: %.1f%% weekend, %.1f%% weekday", overall_weekend_pct, 100 - overall_weekend_pct) + + # Chi-square test would go here if needed for formal significance testing + logger.info("=" * 70) + + return { + "cluster_distribution": summary.to_dicts(), + "detailed_distribution": dist.to_dicts(), + "overall_weekend_pct": overall_weekend_pct, + } + + def save_results( df: pl.DataFrame, labels: np.ndarray, @@ -597,13 +694,14 @@ def main() -> None: parser.add_argument( "--normalize", action="store_true", - help="Apply normalization to profiles", + default=DEFAULT_NORMALIZE, + help="Apply normalization to profiles (default: True)", ) parser.add_argument( "--normalize-method", choices=["zscore", "minmax", "none"], - default="zscore", - help="Normalization method (default: zscore)", + default=DEFAULT_NORMALIZATION, + help="Normalization method (default: minmax)", ) parser.add_argument( @@ -622,9 +720,22 @@ def main() -> None: # Load profiles X, df = load_profiles(args.input) + logger.info("Loaded %s sampled profiles", f"{len(df):,}") + # Normalize if requested if args.normalize: - X = normalize_profiles(X, method=args.normalize_method) + logger.info("Normalizing profiles per household-day (method=%s)", args.normalize_method) + df = normalize_profiles( + df, + method=args.normalize_method, # ✅ FIXED: was args.normalization_method + profile_col="profile", + out_col=None, + ) + # ✅ CRITICAL FIX: Re-extract normalized profiles as numpy array + X = np.array(df["profile"].to_list(), dtype=np.float64) + logger.info(" Normalized data range: [%.2f, %.2f]", X.min(), X.max()) + else: + logger.info("Profile normalization disabled (using raw kWh values).") # Determine k eval_results = None diff --git a/analysis/clustering/euclidean_clustering_minibatch.py b/analysis/clustering/euclidean_clustering_minibatch.py new file mode 100644 index 0000000..b036e4b --- /dev/null +++ b/analysis/clustering/euclidean_clustering_minibatch.py @@ -0,0 +1,382 @@ +#!/usr/bin/env python3 +""" +MiniBatch K-Means clustering for large load-profile datasets (PyArrow-batched). + +Reads an input Parquet file in streaming batches via PyArrow, optionally normalizes +each profile row on-the-fly, fits MiniBatchKMeans with partial_fit, then predicts and +writes cluster assignments back to Parquet incrementally (original columns + `cluster`). + +Outputs: +- cluster_assignments.parquet +- cluster_centroids.parquet +- cluster_centroids.png +- clustering_metadata.json +""" + +from __future__ import annotations + +import argparse +import json +import logging +from collections.abc import Iterable +from pathlib import Path + +import matplotlib.pyplot as plt +import numpy as np +import polars as pl +import pyarrow as pa +import pyarrow.parquet as pq +from sklearn.cluster import MiniBatchKMeans +from sklearn.metrics import silhouette_score + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + + +# ---------------------------- +# IO + batch utilities +# ---------------------------- + + +def parquet_num_rows(path: Path) -> int: + """Return Parquet row count from file metadata (no full scan).""" + pf = pq.ParquetFile(path) + md = pf.metadata + if md is None: + return sum(pf.metadata.row_group(i).num_rows for i in range(pf.num_row_groups)) + return int(md.num_rows) + + +def iter_profile_batches( + path: Path, + batch_size: int, + columns: list[str] | None = None, +) -> Iterable[pa.RecordBatch]: + """Yield record batches from a Parquet file.""" + pf = pq.ParquetFile(path) + yield from pf.iter_batches(batch_size=batch_size, columns=columns) + + +def recordbatch_profiles_to_numpy(rb: pa.RecordBatch, profile_col: str = "profile") -> np.ndarray: + """Convert RecordBatch `profile` list column into a 2D float64 NumPy array.""" + idx = rb.schema.get_field_index(profile_col) + if idx < 0: + raise ValueError(f"RecordBatch missing required column '{profile_col}'") + profiles = rb.column(idx).to_pylist() + X = np.asarray(profiles, dtype=np.float64) + if X.ndim != 2: + raise ValueError(f"Expected 2D profile array; got shape={X.shape}") + return X + + +# ---------------------------- +# Normalization +# ---------------------------- + + +def normalize_batch(X: np.ndarray, method: str) -> np.ndarray: + """Row-wise normalization: minmax, zscore, or none. Constant rows -> zeros.""" + if method in ("none", "", None): + Xn = X + elif method == "minmax": + mins = np.min(X, axis=1, keepdims=True) + maxs = np.max(X, axis=1, keepdims=True) + denom = maxs - mins + denom_safe = np.where(denom == 0, 1.0, denom) + Xn = (X - mins) / denom_safe + Xn = np.where(denom == 0, 0.0, Xn) + elif method == "zscore": + means = np.mean(X, axis=1, keepdims=True) + stds = np.std(X, axis=1, keepdims=True) + std_safe = np.where(stds == 0, 1.0, stds) + Xn = (X - means) / std_safe + Xn = np.where(stds == 0, 0.0, Xn) + else: + raise ValueError(f"Unknown normalize method: {method}") + + if not np.isfinite(Xn).all(): + Xn = np.nan_to_num(Xn, nan=0.0, posinf=0.0, neginf=0.0) + return Xn + + +# ---------------------------- +# Clustering +# ---------------------------- + + +def fit_minibatch_kmeans( + input_path: Path, + k: int, + batch_size: int, + n_init: int, + random_state: int, + normalize: bool, + normalize_method: str, +) -> MiniBatchKMeans: + """Fit MiniBatchKMeans by streaming over `profile` batches and calling partial_fit().""" + logger.info("Fitting MiniBatchKMeans (k=%d, batch_size=%s, n_init=%d)...", k, f"{batch_size:,}", n_init) + + model = MiniBatchKMeans( + n_clusters=k, + batch_size=batch_size, + n_init=n_init, + random_state=random_state, + verbose=0, + ) + + total_rows = parquet_num_rows(input_path) + n_batches = (total_rows + batch_size - 1) // batch_size + logger.info(" Training on %s profiles in %d batches", f"{total_rows:,}", n_batches) + + seen = 0 + for bi, rb in enumerate(iter_profile_batches(input_path, batch_size=batch_size, columns=["profile"]), start=1): + X = recordbatch_profiles_to_numpy(rb, profile_col="profile") + if normalize and normalize_method != "none": + X = normalize_batch(X, normalize_method) + model.partial_fit(X) + seen += X.shape[0] + if bi % 10 == 0 or bi == n_batches: + logger.info(" Trained batch %d/%d (seen=%s)", bi, n_batches, f"{seen:,}") + + logger.info(" Training complete. Inertia: %s", f"{float(model.inertia_):,.2f}") + return model + + +# ruff: noqa: C901 +def predict_and_write_assignments_streaming( + model: MiniBatchKMeans, + input_path: Path, + output_path: Path, + batch_size: int, + normalize: bool, + normalize_method: str, + silhouette_sample_idx: np.ndarray, +) -> tuple[np.ndarray, float]: + """ + Predict labels in batches, write output Parquet incrementally, and optionally + compute silhouette on a sampled set of global row indices. + """ + logger.info("Predicting labels + writing assignments streaming...") + output_path.parent.mkdir(parents=True, exist_ok=True) + + pf = pq.ParquetFile(input_path) + out_schema = pf.schema_arrow.append(pa.field("cluster", pa.int32())) + writer = pq.ParquetWriter(output_path, out_schema, compression="zstd") + + k = int(model.n_clusters) + counts = np.zeros(k, dtype=np.int64) + + sample_X: list[np.ndarray] = None + sample_y: list[int] = None + sample_pos = 0 + if silhouette_sample_idx is not None and len(silhouette_sample_idx) > 0: + sample_X, sample_y = [], [] + + global_row = 0 + try: + for rb in iter_profile_batches(input_path, batch_size=batch_size, columns=None): + X = recordbatch_profiles_to_numpy(rb, profile_col="profile") + if normalize and normalize_method != "none": + X = normalize_batch(X, normalize_method) + + labels = model.predict(X).astype(np.int32) + counts += np.bincount(labels, minlength=k) + + if sample_X is not None and sample_y is not None: + n = X.shape[0] + while sample_pos < len(silhouette_sample_idx) and silhouette_sample_idx[sample_pos] < global_row: + sample_pos += 1 + start = sample_pos + while sample_pos < len(silhouette_sample_idx) and silhouette_sample_idx[sample_pos] < global_row + n: + sample_pos += 1 + if sample_pos > start: + idx_in_batch = silhouette_sample_idx[start:sample_pos] - global_row + for j in idx_in_batch: + jj = int(j) + sample_X.append(X[jj]) + sample_y.append(int(labels[jj])) + + out_rb = rb.append_column("cluster", pa.array(labels, type=pa.int32())) + writer.write_batch(out_rb) + global_row += labels.shape[0] + finally: + writer.close() + + logger.info(" Cluster distribution:") + total = int(counts.sum()) + for c, n in enumerate(counts.tolist()): + pct = (n / total * 100.0) if total > 0 else 0.0 + logger.info(" Cluster %d: %s profiles (%.1f%%)", c, f"{n:,}", pct) + + sil = None + if sample_X is not None and sample_y is not None and len(sample_y) >= 2: + Xs = np.asarray(sample_X, dtype=np.float64) + ys = np.asarray(sample_y, dtype=np.int32) + logger.info("Computing silhouette on sample_size=%s ...", f"{len(ys):,}") + sil = float(silhouette_score(Xs, ys, metric="euclidean")) + logger.info(" Silhouette score (sample): %.3f", sil) + + return counts, sil + + +# ---------------------------- +# Plotting + outputs +# ---------------------------- + + +def plot_centroids(centroids: np.ndarray, output_path: Path) -> None: + """Save a line plot of cluster centroids.""" + k = int(centroids.shape[0]) + n_timepoints = int(centroids.shape[1]) + + if n_timepoints == 48: + x = np.arange(0.5, 24.5, 0.5) + xlabel = "Hour of Day" + else: + x = np.arange(n_timepoints) + xlabel = "Time Interval" + + fig, ax = plt.subplots(figsize=(12, 6)) + for i in range(k): + ax.plot(x, centroids[i], label=f"Cluster {i}", linewidth=2) + + ax.set_xlabel(xlabel, fontsize=12) + ax.set_ylabel("Usage (normalized)" if n_timepoints else "Usage", fontsize=12) + ax.set_title("Cluster Centroids (MiniBatch K-Means)", fontsize=14) + ax.legend() + ax.grid(True, alpha=0.3) + + if n_timepoints == 48: + ax.set_xticks(range(0, 25, 4)) + ax.set_xlim(0, 24) + + output_path.parent.mkdir(parents=True, exist_ok=True) + plt.tight_layout() + plt.savefig(output_path, dpi=150) + plt.close() + logger.info(" Saved centroids plot: %s", output_path) + + +def save_centroids_parquet(centroids: np.ndarray, output_path: Path) -> None: + """Write centroids to Parquet as (cluster, centroid[list[float]]).""" + centroids_df = pl.DataFrame({ + "cluster": list(range(int(centroids.shape[0]))), + "centroid": [c.tolist() for c in centroids], + }) + centroids_df.write_parquet(output_path) + logger.info(" Saved centroids parquet: %s", output_path) + + +def save_metadata(metadata: dict, output_path: Path) -> None: + """Write run metadata and summary metrics to JSON.""" + with open(output_path, "w", encoding="utf-8") as f: + json.dump(metadata, f, indent=2) + logger.info(" Saved metadata: %s", output_path) + + +# ---------------------------- +# Main +# ---------------------------- + + +def main() -> int: + """CLI entrypoint.""" + parser = argparse.ArgumentParser( + description="Memory-Efficient K-Means Clustering (MiniBatch, PyArrow-batched)", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("--input", type=Path, required=True, help="sampled_profiles.parquet") + parser.add_argument("--output-dir", type=Path, required=True, help="Output directory") + parser.add_argument("--k", type=int, required=True, help="Number of clusters") + parser.add_argument("--normalize", action="store_true", help="Normalize profiles per row") + parser.add_argument("--normalize-method", choices=["minmax", "zscore", "none"], default="minmax") + parser.add_argument("--batch-size", type=int, default=50_000, help="Batch size (default: 50k)") + parser.add_argument("--n-init", type=int, default=3, help="Number of initializations (default: 3)") + parser.add_argument( + "--silhouette-sample-size", + type=int, + default=5_000, + help="Sample size for silhouette (default: 5000; set 0 to skip)", + ) + parser.add_argument("--random-state", type=int, default=42, help="Random seed") + + args = parser.parse_args() + args.output_dir.mkdir(parents=True, exist_ok=True) + + logger.info("=" * 70) + logger.info("MINIBATCH K-MEANS CLUSTERING (PYARROW-BATCHED)") + logger.info("=" * 70) + logger.info("Input: %s", args.input) + logger.info("k: %d", args.k) + logger.info("Batch size: %s", f"{args.batch_size:,}") + + eff_norm = args.normalize and args.normalize_method != "none" + logger.info("Normalize: %s (method=%s)", eff_norm, args.normalize_method if eff_norm else "none") + + n_profiles = parquet_num_rows(args.input) + logger.info("Profiles (from parquet metadata): %s", f"{n_profiles:,}") + + model = fit_minibatch_kmeans( + input_path=args.input, + k=args.k, + batch_size=args.batch_size, + n_init=args.n_init, + random_state=args.random_state, + normalize=bool(eff_norm), + normalize_method=args.normalize_method, + ) + centroids = model.cluster_centers_ + + silhouette_sample_idx = None + if args.silhouette_sample_size and args.silhouette_sample_size > 0: + if args.silhouette_sample_size >= n_profiles: + silhouette_sample_idx = np.arange(n_profiles, dtype=np.int64) + else: + rng = np.random.default_rng(args.random_state) + silhouette_sample_idx = rng.choice(n_profiles, size=args.silhouette_sample_size, replace=False) + silhouette_sample_idx.sort() + + assignments_path = args.output_dir / "cluster_assignments.parquet" + counts, sil_score = predict_and_write_assignments_streaming( + model=model, + input_path=args.input, + output_path=assignments_path, + batch_size=args.batch_size, + normalize=bool(eff_norm), + normalize_method=args.normalize_method, + silhouette_sample_idx=silhouette_sample_idx, + ) + + plot_centroids(centroids, args.output_dir / "cluster_centroids.png") + save_centroids_parquet(centroids, args.output_dir / "cluster_centroids.parquet") + + metadata = { + "k": int(args.k), + "n_profiles": int(n_profiles), + "n_timepoints": int(centroids.shape[1]), + "normalized": bool(eff_norm), + "normalize_method": args.normalize_method if eff_norm else "none", + "batch_size": int(args.batch_size), + "n_init": int(args.n_init), + "random_state": int(args.random_state), + "algorithm": "MiniBatchKMeans", + "inertia": float(model.inertia_), + "silhouette_score_sample": float(sil_score) if sil_score is not None else None, + "cluster_counts": {str(i): int(c) for i, c in enumerate(counts.tolist())}, + } + save_metadata(metadata, args.output_dir / "clustering_metadata.json") + + logger.info("=" * 70) + logger.info("CLUSTERING COMPLETE") + logger.info("=" * 70) + logger.info("Profiles: %s", f"{n_profiles:,}") + logger.info("Clusters: %d", args.k) + if sil_score is not None: + logger.info("Silhouette (sample): %.3f", sil_score) + logger.info("Output: %s", args.output_dir) + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/analysis/clustering/prepare_clustering_data_households.py b/analysis/clustering/prepare_clustering_data_households.py index 660ff31..da0cc5b 100644 --- a/analysis/clustering/prepare_clustering_data_households.py +++ b/analysis/clustering/prepare_clustering_data_households.py @@ -6,6 +6,8 @@ Transforms interval-level energy data into daily load profiles at the HOUSEHOLD (account) level for k-means clustering. +MODIFIED: Now accepts multiple input parquet files (e.g., split by time period). + What this does: 1. Validate schema (no full data scan) 2. Build/load manifests for accounts and dates (streaming, memory-safe) @@ -21,7 +23,7 @@ - Chunked streaming mode: fully streaming pipeline with per-chunk sinks * required for 10k+ households on constrained hardware - Profiles are 48 half-hourly kWh values in chronological order (00:30-24:00) - - Incomplete days (not 48 intervals) are dropped as “missing/irregular data” + - Incomplete days (not 48 intervals) are dropped as "missing/irregular data" Output files: - sampled_profiles.parquet: @@ -32,23 +34,6 @@ Manifest files (auto-generated alongside input, via smart_meter_analysis.manifests): - {input_stem}_accounts.parquet: unique (account_identifier, zip_code) pairs - {input_stem}_dates.parquet: unique (date, is_weekend, weekday) tuples - -Usage: - # Standard (5,000 households, 20 days) - python prepare_clustering_data_households.py \ - --input data/processed/comed_202308.parquet \ - --output-dir data/clustering \ - --sample-households 5000 \ - --sample-days 20 - - # Large dataset with chunked streaming (20,000 households) - python prepare_clustering_data_households.py \ - --input data/processed/comed_202308.parquet \ - --output-dir data/clustering \ - --sample-households 20000 \ - --sample-days 31 \ - --streaming \ - --chunk-size 2000 """ from __future__ import annotations @@ -61,18 +46,11 @@ import polars as pl -from smart_meter_analysis.manifests import ( - ensure_account_manifest, - ensure_date_manifest, -) +from smart_meter_analysis.manifests import ensure_account_manifest, ensure_date_manifest -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(levelname)s - %(message)s", -) +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) -# Required columns for household-level clustering REQUIRED_ENERGY_COLS = ["zip_code", "account_identifier", "datetime", "kwh"] REQUIRED_TIME_COLS = ["date", "hour", "is_weekend", "weekday"] @@ -80,8 +58,6 @@ # ============================================================================= # MEMORY INSTRUMENTATION # ============================================================================= - - def log_memory(label: str) -> None: """Log current RSS memory usage (Linux only).""" try: @@ -92,33 +68,29 @@ def log_memory(label: str) -> None: logger.info("[MEMORY] %s: %.0f MB", label, mem_mb) break except Exception as exc: - # Best-effort only; ignore on non-Linux or restricted environments logger.debug("Skipping memory log for %s: %s", label, exc) # ============================================================================= -# DATA INSPECTION & SAMPLING +# MULTI-FILE SUPPORT # ============================================================================= +def scan_multiple_parquet(paths: list[Path]) -> pl.LazyFrame: + """Create a unified LazyFrame from multiple parquet files.""" + if len(paths) == 1: + return pl.scan_parquet(paths[0]) + logger.info("Combining %d input files...", len(paths)) + return pl.concat([pl.scan_parquet(p) for p in paths]) -def validate_schema(path: Path) -> dict[str, Any]: - """ - Validate input data has required columns (schema-only check, no full data scan). - - Args: - path: Path to input parquet file. - - Returns: - Dictionary with: - - valid: bool - - errors: list[str] - - columns: list[str] - """ - lf = pl.scan_parquet(path) +# ============================================================================= +# DATA INSPECTION & SAMPLING +# ============================================================================= +def validate_schema(paths: list[Path]) -> dict[str, Any]: + """Validate input data has required columns (schema-only check).""" + lf = scan_multiple_parquet(paths) schema = lf.collect_schema() errors: list[str] = [] - missing_energy = [c for c in REQUIRED_ENERGY_COLS if c not in schema.names()] missing_time = [c for c in REQUIRED_TIME_COLS if c not in schema.names()] @@ -127,15 +99,22 @@ def validate_schema(path: Path) -> dict[str, Any]: if missing_time: errors.append(f"Missing time columns: {missing_time}") - return { - "valid": len(errors) == 0, - "errors": errors, - "columns": schema.names(), - } + return {"valid": len(errors) == 0, "errors": errors, "columns": schema.names()} + +def _as_polars_date_list(dates: list[Any]) -> list[Any]: + """ + Ensure Python/Polars date values are normalized to Polars Date dtype + before using in `.is_in()`. + """ + if not dates: + return dates + # This handles python datetime.date, python datetime.datetime, strings, etc. + return pl.Series("dates", dates).cast(pl.Date).to_list() -def get_metadata_and_samples( - input_path: Path, + +def get_metadata_and_samples( # noqa: C901 + input_paths: list[Path], sample_households: int | None, sample_days: int, day_strategy: Literal["stratified", "random"], @@ -143,31 +122,12 @@ def get_metadata_and_samples( ) -> dict[str, Any]: """ Get summary statistics and sample households + dates using MANIFESTS. - - This function: - - Computes summary stats via streaming (safe for large files) - - Builds/loads account and date manifests (streaming sink, memory-safe) - - Samples households and dates from the small manifest files - - Args: - input_path: Path to input parquet file. - sample_households: Number of households to sample (None = all). - sample_days: Number of days to sample. - day_strategy: 'stratified' (70/30 weekday/weekend) or 'random'. - seed: Random seed for reproducibility. - - Returns: - Dictionary with: - - summary: dict with n_rows, n_accounts, n_zip_codes, min_date, max_date - - accounts: list[str] of sampled account identifiers - - dates: list[date] of sampled dates """ logger.info("Scanning input data for metadata and sampling...") log_memory("start of get_metadata_and_samples") - lf = pl.scan_parquet(input_path) + lf = scan_multiple_parquet(input_paths) - # Summary stats (streaming-safe: single row output) logger.info(" Computing summary statistics...") summary_df = lf.select([ pl.len().alias("n_rows"), @@ -178,34 +138,43 @@ def get_metadata_and_samples( ]).collect(engine="streaming") summary = summary_df.to_dicts()[0] - # Early checks if summary["n_rows"] == 0: - raise ValueError(f"Input file {input_path} contains no rows.") + raise ValueError("Input files contain no rows.") logger.info(" %s rows", f"{summary['n_rows']:,}") logger.info(" %s households", f"{summary['n_accounts']:,}") logger.info(" %s ZIP+4 codes", f"{summary['n_zip_codes']:,}") logger.info(" Date range: %s to %s", summary["min_date"], summary["max_date"]) - # ========================================================================== - # KEY CHANGE: Use manifests instead of unique().collect() - # ========================================================================== + # Load manifests from first input + logger.info(" Loading manifests from first input file...") + account_manifest0 = ensure_account_manifest(input_paths[0]) + date_manifest0 = ensure_date_manifest(input_paths[0]) - logger.info(" Loading manifests...") - account_manifest = ensure_account_manifest(input_path) - date_manifest = ensure_date_manifest(input_path) + accounts_df = pl.read_parquet(account_manifest0) + dates_df = pl.read_parquet(date_manifest0) - # Read from small manifest files (fits easily in memory) - accounts_df = pl.read_parquet(account_manifest) - dates_df = pl.read_parquet(date_manifest) + # If multiple files, combine manifests from all files first + if len(input_paths) > 1: + logger.info(" Loading manifests from additional input files...") + for path in input_paths[1:]: + acc_manifest = ensure_account_manifest(path) + date_manifest_extra = ensure_date_manifest(path) + accounts_df = pl.concat([accounts_df, pl.read_parquet(acc_manifest)]).unique() + dates_df = pl.concat([dates_df, pl.read_parquet(date_manifest_extra)]).unique() - log_memory("after loading manifests") + # Apply July-only filter (after all dates are assembled) + # THIS IS JUST A BANDAID IT WILL GET FIXED ASAP + dates_df = dates_df.filter((pl.col("date") >= pl.date(2023, 7, 1)) & (pl.col("date") <= pl.date(2023, 7, 31))) + logger.info(" Dates available after July filter: %d", dates_df.height) if accounts_df.height == 0: raise ValueError("No account_identifier values found in manifest.") if dates_df.height == 0: raise ValueError("No dates found in manifest.") + log_memory("after loading manifests") + # Sample households if sample_households is not None and sample_households < len(accounts_df): accounts_df = accounts_df.sample(n=sample_households, shuffle=True, seed=seed) @@ -219,7 +188,6 @@ def get_metadata_and_samples( if day_strategy == "stratified": weekday_df = dates_df.filter(~pl.col("is_weekend")) weekend_df = dates_df.filter(pl.col("is_weekend")) - if weekday_df.height == 0 or weekend_df.height == 0: logger.warning(" Missing weekdays or weekends; falling back to random day sampling.") day_strategy = "random" @@ -238,7 +206,7 @@ def get_metadata_and_samples( weekend_df.sample(n=n_weekends, shuffle=True, seed=seed + 1)["date"].to_list() if n_weekends > 0 else [] ) - dates = sampled_weekdays + sampled_weekends + dates = sorted(sampled_weekdays + sampled_weekends) logger.info( " Sampled %d weekdays + %d weekend days (stratified)", len(sampled_weekdays), @@ -252,54 +220,24 @@ def get_metadata_and_samples( if not dates: raise ValueError("No dates were sampled; check input data and sampling settings.") + # Normalize date types now (so downstream `.is_in()` is reliable) + dates = _as_polars_date_list(dates) + log_memory("end of get_metadata_and_samples") + logger.info(" Sampled days requested: %d; sampled days returned: %d", sample_days, len(dates)) + logger.info(" Sampled dates: %s", dates) - return { - "summary": summary, - "accounts": accounts, - "dates": dates, - } + return {"summary": summary, "accounts": accounts, "dates": dates} # ============================================================================= # PROFILE CREATION # ============================================================================= - - -def create_household_profiles( - input_path: Path, - accounts: list[str], - dates: list[Any], -) -> pl.DataFrame: +def create_household_profiles(input_paths: list[Path], accounts: list[str], dates: list[Any]) -> pl.DataFrame: """ Create daily load profiles for selected households and dates (in-memory). - - Each profile is a 48-point vector (list) of half-hourly kWh values for one - household on one day. The profile list is in chronological order - (00:30 to 24:00). - - This uses a single filtered collect() over the full file, which is efficient - when sampling a moderate subset of households and dates. - - Args: - input_path: Path to input parquet file. - accounts: List of account_identifier values to include. - dates: List of dates to include. - - Returns: - DataFrame with columns: - - account_identifier - - zip_code - - date - - profile (list[float], length 48) - - is_weekend - - weekday """ - logger.info( - "Creating profiles for %s households x %d days...", - f"{len(accounts):,}", - len(dates), - ) + logger.info("Creating profiles for %s households x %d days...", f"{len(accounts):,}", len(dates)) log_memory("start of create_household_profiles") if not accounts: @@ -307,14 +245,22 @@ def create_household_profiles( if not dates: raise ValueError("No dates provided for profile creation.") - df = ( - pl.scan_parquet(input_path) - .filter(pl.col("account_identifier").is_in(accounts) & pl.col("date").is_in(dates)) - # Ensures profile list is chronological within each (account, date) - .sort(["account_identifier", "datetime"]) - .collect() + # ensure dates are Polars Date-compatible + dates = _as_polars_date_list(dates) + + lf = scan_multiple_parquet(input_paths).filter( + pl.col("account_identifier").is_in(accounts) & pl.col("date").is_in(dates) ) + # Diagnostic: how many of the sampled dates actually appear after filtering? + present_dates = lf.select(pl.col("date").unique().sort()).collect(engine="streaming")["date"].to_list() + logger.info(" Sampled dates present in filtered interval data: %d / %d", len(present_dates), len(dates)) + if len(present_dates) < len(dates): + missing = sorted(set(dates) - set(present_dates)) + logger.warning(" Missing sampled dates after interval filter: %s", missing) + + df = lf.sort(["account_identifier", "datetime"]).collect() + if df.is_empty(): logger.warning(" No data found for selected households/dates") return pl.DataFrame() @@ -334,10 +280,7 @@ def create_household_profiles( n_dropped = n_before - len(profiles_df) if n_dropped > 0: - logger.info( - " Dropped %s incomplete profiles (missing or irregular data)", - f"{n_dropped:,}", - ) + logger.info(" Dropped %s incomplete profiles (missing or irregular data)", f"{n_dropped:,}") logger.info(" Created %s complete profiles", f"{len(profiles_df):,}") log_memory("end of create_household_profiles") @@ -346,7 +289,7 @@ def create_household_profiles( def _create_profiles_for_chunk_streaming( - input_path: Path, + input_paths: list[Path], accounts_chunk: list[str], dates: list[Any], chunk_idx: int, @@ -355,29 +298,20 @@ def _create_profiles_for_chunk_streaming( ) -> Path: """ Create profiles for a single chunk of households and write directly to parquet. - - Uses within-group sort (struct.sort_by) to preserve chronological order - without a global sort, which keeps the lazy plan streaming-compatible. - - Returns: - Path to the chunk parquet file. """ - logger.info( - " Chunk %d/%d: %s households...", - chunk_idx + 1, - total_chunks, - f"{len(accounts_chunk):,}", - ) + logger.info(" Chunk %d/%d: %s households...", chunk_idx + 1, total_chunks, f"{len(accounts_chunk):,}") log_memory(f"chunk {chunk_idx + 1} start") + # ensure dates are Polars Date-compatible + dates = _as_polars_date_list(dates) + chunk_path = tmp_dir / f"sampled_profiles_chunk_{chunk_idx:03d}.parquet" ( - pl.scan_parquet(input_path) + scan_multiple_parquet(input_paths) .filter(pl.col("account_identifier").is_in(accounts_chunk) & pl.col("date").is_in(dates)) .group_by(["account_identifier", "zip_code", "date"]) .agg([ - # Sort by datetime within group, then extract kwh values pl.struct(["datetime", "kwh"]).sort_by("datetime").struct.field("kwh").alias("profile"), pl.col("is_weekend").first(), pl.col("weekday").first(), @@ -390,12 +324,11 @@ def _create_profiles_for_chunk_streaming( log_memory(f"chunk {chunk_idx + 1} done") logger.info(" Wrote chunk parquet: %s", chunk_path) - return chunk_path def create_household_profiles_chunked_streaming( - input_path: Path, + input_paths: list[Path], accounts: list[str], dates: list[Any], output_path: Path, @@ -403,31 +336,15 @@ def create_household_profiles_chunked_streaming( ) -> int: """ Create daily load profiles using chunked streaming. - - - Splits households into chunks. - - For each chunk, runs a streaming filter → group_by → aggregate → sink. - - Concatenates all chunk files using a streaming concat. - - Deletes temporary chunk files. - - This avoids ever materializing profiles for all households in memory. - - Args: - input_path: Path to interval-level parquet file. - accounts: List of account_identifier values to include. - dates: List of dates to include. - output_path: Final output parquet path for all profiles. - chunk_size: Number of households per chunk. - - Returns: - Number of complete daily profiles created. """ if not accounts: - raise ValueError( - "No accounts provided for chunked streaming profile creation.", - ) + raise ValueError("No accounts provided for chunked streaming profile creation.") if not dates: raise ValueError("No dates provided for chunked streaming profile creation.") + # Defensive: ensure dates are Polars Date-compatible + dates = _as_polars_date_list(dates) + n_accounts = len(accounts) n_chunks = (n_accounts + chunk_size - 1) // chunk_size @@ -444,14 +361,13 @@ def create_household_profiles_chunked_streaming( tmp_dir = output_path.parent chunk_paths: list[Path] = [] - for i in range(n_chunks): start_idx = i * chunk_size end_idx = min((i + 1) * chunk_size, n_accounts) accounts_chunk = accounts[start_idx:end_idx] chunk_path = _create_profiles_for_chunk_streaming( - input_path=input_path, + input_paths=input_paths, accounts_chunk=accounts_chunk, dates=dates, chunk_idx=i, @@ -459,7 +375,6 @@ def create_household_profiles_chunked_streaming( tmp_dir=tmp_dir, ) - # Only keep non-empty chunk files if chunk_path.exists() and chunk_path.stat().st_size > 0: chunk_paths.append(chunk_path) @@ -469,21 +384,17 @@ def create_household_profiles_chunked_streaming( logger.warning("No profiles created in chunked streaming mode!") return 0 - # Stream-concatenate all chunk parquet files logger.info("Combining %d chunk files into %s", len(chunk_paths), output_path) log_memory("before streaming concat") - combined_lf = pl.concat([pl.scan_parquet(p) for p in chunk_paths]) - combined_lf.sink_parquet(output_path) + pl.concat([pl.scan_parquet(p) for p in chunk_paths]).sink_parquet(output_path) log_memory("after streaming concat") - # Count rows in final output n_profiles = pl.scan_parquet(output_path).select(pl.len()).collect()[0, 0] logger.info(" Created %s complete profiles (chunked streaming)", f"{n_profiles:,}") logger.info(" Saved to %s", output_path) - # Clean up temporary chunk files for p in chunk_paths: try: p.unlink(missing_ok=True) @@ -496,10 +407,8 @@ def create_household_profiles_chunked_streaming( # ============================================================================= # MAIN ORCHESTRATOR # ============================================================================= - - def prepare_clustering_data( - input_path: Path, + input_paths: list[Path], output_dir: Path, sample_households: int | None = None, sample_days: int = 20, @@ -508,41 +417,22 @@ def prepare_clustering_data( chunk_size: int = 5000, seed: int = 42, ) -> dict[str, Any]: - """ - Prepare household-level clustering data from interval parquet. - - Args: - input_path: Path to processed interval parquet file. - output_dir: Output directory for clustering files. - sample_households: Number of households to sample (None = all). - sample_days: Number of days to sample. - day_strategy: 'stratified' (70% weekdays) or 'random'. - streaming: If True, use chunked streaming mode for large samples. - chunk_size: Households per chunk when streaming is enabled. - seed: Random seed for reproducibility. - - Returns: - Statistics dictionary: - - n_profiles - - n_households - - n_zip4s - - n_dates - """ + """Prepare household-level clustering data from interval parquet.""" logger.info("=" * 70) logger.info("PREPARING HOUSEHOLD-LEVEL CLUSTERING DATA") + if len(input_paths) > 1: + logger.info("(MULTI-FILE MODE: %d input files)", len(input_paths)) if streaming: logger.info("(STREAMING MODE ENABLED, chunk_size=%d)", chunk_size) logger.info("=" * 70) log_memory("start of prepare_clustering_data") - # 1. Schema validation (cheap, no data load) - validation = validate_schema(input_path) + validation = validate_schema(input_paths) if not validation["valid"]: raise ValueError(f"Input validation failed: {validation['errors']}") - # 2. Metadata + sampling (uses manifests for memory safety) metadata = get_metadata_and_samples( - input_path=input_path, + input_paths=input_paths, sample_households=sample_households, sample_days=sample_days, day_strategy=day_strategy, @@ -552,38 +442,24 @@ def prepare_clustering_data( accounts = metadata["accounts"] dates = metadata["dates"] - # 3. Ensure output directory exists output_dir.mkdir(parents=True, exist_ok=True) profiles_path = output_dir / "sampled_profiles.parquet" - # 4. Create profiles (in-memory or chunked streaming) if streaming: n_profiles = create_household_profiles_chunked_streaming( - input_path=input_path, + input_paths=input_paths, accounts=accounts, dates=dates, output_path=profiles_path, chunk_size=chunk_size, ) - if n_profiles == 0: - raise ValueError( - "No profiles created in chunked streaming mode - check input data and sampling settings.", - ) - + raise ValueError("No profiles created in chunked streaming mode - check input data and sampling settings.") profiles_df = pl.read_parquet(profiles_path) else: - profiles_df = create_household_profiles( - input_path=input_path, - accounts=accounts, - dates=dates, - ) - + profiles_df = create_household_profiles(input_paths=input_paths, accounts=accounts, dates=dates) if profiles_df.is_empty(): - raise ValueError( - "No profiles created - check input data and sampling settings.", - ) - + raise ValueError("No profiles created - check input data and sampling settings.") profiles_df.select([ "account_identifier", "zip_code", @@ -594,17 +470,11 @@ def prepare_clustering_data( ]).write_parquet(profiles_path) logger.info(" Saved profiles: %s", profiles_path) - # 5. Save household → ZIP+4 mapping household_map = profiles_df.select(["account_identifier", "zip_code"]).unique() map_path = output_dir / "household_zip4_map.parquet" household_map.write_parquet(map_path) - logger.info( - " Saved household-ZIP+4 map: %s (%s households)", - map_path, - f"{len(household_map):,}", - ) + logger.info(" Saved household-ZIP+4 map: %s (%s households)", map_path, f"{len(household_map):,}") - # 6. Stats stats = { "n_profiles": len(profiles_df), "n_households": profiles_df["account_identifier"].n_unique(), @@ -630,109 +500,46 @@ def main() -> int: parser = argparse.ArgumentParser( description="Prepare household-level data for clustering", formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - # Standard run (5,000 households, 20 days) - python prepare_clustering_data_households.py \\ - --input data/processed/comed_202308.parquet \\ - --output-dir data/clustering \\ - --sample-households 5000 \\ - --sample-days 20 - - # Large dataset with chunked streaming - python prepare_clustering_data_households.py \\ - --input data/processed/comed_202308.parquet \\ - --output-dir data/clustering \\ - --sample-households 20000 \\ - --sample-days 31 \\ - --streaming \\ - --chunk-size 2000 - - # All households, fewer days - python prepare_clustering_data_households.py \\ - --input data/processed/comed_202308.parquet \\ - --output-dir data/clustering \\ - --sample-days 10 - - # Quick test - python prepare_clustering_data_households.py \\ - --input data/processed/comed_202308.parquet \\ - --output-dir data/clustering \\ - --sample-households 500 \\ - --sample-days 5 - """, ) parser.add_argument( - "--input", - type=Path, - required=True, - help="Path to processed interval parquet file.", + "--input", type=Path, required=True, nargs="+", help="Path(s) to processed interval parquet file(s)." ) parser.add_argument( - "--output-dir", - type=Path, - default=Path("data/clustering"), - help="Output directory (default: data/clustering).", + "--output-dir", type=Path, default=Path("data/clustering"), help="Output directory (default: data/clustering)." ) parser.add_argument( - "--sample-households", - type=int, - default=None, - help="Number of households to sample (default: all).", + "--sample-households", type=int, default=None, help="Number of households to sample (default: all)." ) + parser.add_argument("--sample-days", type=int, default=20, help="Number of days to sample (default: 20).") parser.add_argument( - "--sample-days", - type=int, - default=20, - help="Number of days to sample (default: 20).", + "--day-strategy", choices=["stratified", "random"], default="stratified", help="Day sampling strategy." ) + parser.add_argument("--seed", type=int, default=42, help="Random seed.") + parser.add_argument("--streaming", action="store_true", help="Use chunked streaming mode (10k+ households).") parser.add_argument( - "--day-strategy", - choices=["stratified", "random"], - default="stratified", - help="Day sampling: stratified (70% weekday) or random.", - ) - parser.add_argument( - "--seed", - type=int, - default=42, - help="Random seed (default: 42).", - ) - parser.add_argument( - "--streaming", - action="store_true", - help="Use chunked streaming mode for large household samples (10k+).", - ) - parser.add_argument( - "--chunk-size", - type=int, - default=5000, - help="Households per chunk when --streaming is enabled (default: 5000).", + "--chunk-size", type=int, default=5000, help="Households per chunk when --streaming is enabled." ) args = parser.parse_args() - if not args.input.exists(): - logger.error("Input file not found: %s", args.input) - return 1 - - try: - prepare_clustering_data( - input_path=args.input, - output_dir=args.output_dir, - sample_households=args.sample_households, - sample_days=args.sample_days, - day_strategy=args.day_strategy, - streaming=args.streaming, - chunk_size=args.chunk_size, - seed=args.seed, - ) - return 0 - except Exception as exc: - logger.error("Failed: %s", exc) - # Re-raise so stack traces are visible in logs when run via a pipeline - raise + input_paths = args.input if isinstance(args.input, list) else [args.input] + for path in input_paths: + if not path.exists(): + logger.error("Input file not found: %s", path) + return 1 + + prepare_clustering_data( + input_paths=input_paths, + output_dir=args.output_dir, + sample_households=args.sample_households, + sample_days=args.sample_days, + day_strategy=args.day_strategy, + streaming=args.streaming, + chunk_size=args.chunk_size, + seed=args.seed, + ) + return 0 if __name__ == "__main__": diff --git a/analysis/clustering/stage2_blockgroup_regression.py b/analysis/clustering/stage2_blockgroup_regression.py index 5a4df60..798ce5c 100644 --- a/analysis/clustering/stage2_blockgroup_regression.py +++ b/analysis/clustering/stage2_blockgroup_regression.py @@ -58,20 +58,16 @@ format="%(asctime)s - %(levelname)s - %(message)s", ) -# Default predictors (for convergence and interpretability) DEFAULT_PREDICTORS = [ + "Owner_Occupied_Pct", + "Average_Household_Size", + "Old_Building_Pct", + "Heat_Electric_Pct", "Median_Household_Income", - "Median_Age", "Urban_Percent", - "Total_Households", ] -# ============================================================================= -# 1. LOAD CLUSTER ASSIGNMENTS (HOUSEHOLD-DAY LEVEL) -# ============================================================================= - - def load_cluster_assignments_household_day(path: Path) -> tuple[pl.DataFrame, dict]: """ Load household-day cluster assignments. @@ -113,7 +109,6 @@ def load_cluster_assignments_household_day(path: Path) -> tuple[pl.DataFrame, di n_clusters, ) - # This doesn't affect the regression - just useful context dominance_stats = _compute_dominance_stats(raw) logger.info( @@ -135,16 +130,12 @@ def _compute_dominance_stats(df: pl.DataFrame) -> dict: Returns summary statistics across all households. """ - # Count days per (household, cluster) counts = df.group_by(["account_identifier", "cluster"]).agg(pl.len().alias("days_in_cluster")) - # Total days per household totals = counts.group_by("account_identifier").agg(pl.col("days_in_cluster").sum().alias("n_days")) - # Max days in any single cluster per household max_days = counts.group_by("account_identifier").agg(pl.col("days_in_cluster").max().alias("max_days_in_cluster")) - # Compute dominance dominance_df = max_days.join(totals, on="account_identifier").with_columns( (pl.col("max_days_in_cluster") / pl.col("n_days")).alias("dominance") ) @@ -164,11 +155,6 @@ def _compute_dominance_stats(df: pl.DataFrame) -> dict: } -# ============================================================================= -# 2. CROSSWALK AND BLOCK GROUP MAPPING -# ============================================================================= - - def load_crosswalk(crosswalk_path: Path, zip_codes: list[str]) -> pl.DataFrame: """ Load ZIP+4 → Census block-group crosswalk for the ZIP+4s in our data. @@ -200,7 +186,6 @@ def load_crosswalk(crosswalk_path: Path, zip_codes: list[str]) -> pl.DataFrame: logger.warning(" Crosswalk is empty after filtering for sample ZIP+4s.") return crosswalk - # Fan-out diagnostic fanout = crosswalk.group_by("zip_code").agg(pl.n_unique("block_group_geoid").alias("n_block_groups")) max_fanout = int(fanout["n_block_groups"].max()) @@ -247,11 +232,6 @@ def attach_block_groups_to_household_days( return df -# ============================================================================= -# 3. AGGREGATE TO BLOCK-GROUP x CLUSTER COUNTS -# ============================================================================= - - def aggregate_blockgroup_cluster_counts(df: pl.DataFrame) -> pl.DataFrame: """ Aggregate household-day observations to block-group x cluster counts. @@ -270,19 +250,16 @@ def aggregate_blockgroup_cluster_counts(df: pl.DataFrame) -> pl.DataFrame: """ logger.info("Aggregating to block-group x cluster counts (household-day units)...") - # Counts per (block_group, cluster) counts = df.group_by(["block_group_geoid", "cluster"]).agg([ pl.len().alias("n_obs"), pl.col("account_identifier").n_unique().alias("n_households"), ]) - # Totals per block_group totals = df.group_by("block_group_geoid").agg([ pl.len().alias("total_obs"), pl.col("account_identifier").n_unique().alias("total_households"), ]) - # Merge and compute cluster shares bg_counts = counts.join(totals, on="block_group_geoid", how="left").with_columns( (pl.col("n_obs") / pl.col("total_obs")).alias("cluster_share") ) @@ -301,11 +278,6 @@ def aggregate_blockgroup_cluster_counts(df: pl.DataFrame) -> pl.DataFrame: return bg_counts -# ============================================================================= -# 4. CENSUS DATA -# ============================================================================= - - def fetch_or_load_census( cache_path: Path, state_fips: str = "17", @@ -328,6 +300,43 @@ def fetch_or_load_census( return census_df +def create_derived_variables(census_df: pl.DataFrame) -> pl.DataFrame: + """Create derived percentage variables from raw Census counts.""" + logger.info("Creating derived variables...") + + df = census_df.with_columns([ + (pl.col("Owner_Occupied") / pl.col("Occupied_Housing_Units") * 100).alias("Owner_Occupied_Pct"), + (pl.col("Heat_Electric") / pl.col("Total_Households") * 100).alias("Heat_Electric_Pct"), + ( + ( + pl.col("Built_1960_1969") + + pl.col("Built_1950_1959") + + pl.col("Built_1940_1949") + + pl.col("Built_1939_Earlier") + ) + / pl.col("Total_Housing_Units") + * 100 + ).alias("Old_Building_Pct"), + ]) + + df = df.with_columns([ + pl.when(pl.col("Owner_Occupied_Pct").is_nan()) + .then(None) + .otherwise(pl.col("Owner_Occupied_Pct")) + .alias("Owner_Occupied_Pct"), + pl.when(pl.col("Heat_Electric_Pct").is_nan()) + .then(None) + .otherwise(pl.col("Heat_Electric_Pct")) + .alias("Heat_Electric_Pct"), + pl.when(pl.col("Old_Building_Pct").is_nan()) + .then(None) + .otherwise(pl.col("Old_Building_Pct")) + .alias("Old_Building_Pct"), + ]) + + return df + + def attach_census_to_blockgroups(bg_counts: pl.DataFrame, census_df: pl.DataFrame) -> pl.DataFrame: """Attach Census demographics to block-group cluster counts.""" logger.info("Joining Census data to block-group counts...") @@ -349,11 +358,6 @@ def attach_census_to_blockgroups(bg_counts: pl.DataFrame, census_df: pl.DataFram return demo -# ============================================================================= -# 5. PREPARE REGRESSION DATA -# ============================================================================= - - def prepare_regression_dataset( demo_df: pl.DataFrame, predictors: list[str], @@ -369,7 +373,6 @@ def prepare_regression_dataset( """ logger.info("Preparing regression dataset...") - # Filter by minimum observations (household-days) df = demo_df.filter(pl.col("total_obs") >= min_obs_per_bg) logger.info( " After min_obs filter (>=%d): %s block groups", @@ -377,7 +380,6 @@ def prepare_regression_dataset( f"{df['block_group_geoid'].n_unique():,}", ) - # Require block groups to have multiple clusters represented nonzero_counts = ( df.filter(pl.col("n_obs") > 0).group_by("block_group_geoid").agg(pl.len().alias("n_nonzero_clusters")) ) @@ -394,7 +396,6 @@ def prepare_regression_dataset( f"{df['block_group_geoid'].n_unique():,}", ) - # Filter predictors available_predictors: list[str] = [] for p in predictors: if p not in df.columns: @@ -421,11 +422,6 @@ def prepare_regression_dataset( return df, available_predictors -# ============================================================================= -# 6. MULTINOMIAL REGRESSION -# ============================================================================= - - def run_multinomial_regression( reg_df: pl.DataFrame, predictors: list[str], @@ -454,12 +450,10 @@ def run_multinomial_regression( logger.info("Running multinomial logistic regression...") logger.info(" Weighting by: %s (household-day observations)", weight_col) - # Extract arrays X = reg_df.select(predictors).to_numpy().astype(np.float64) y = reg_df.get_column(outcome).to_numpy() weights = reg_df.get_column(weight_col).to_numpy().astype(np.float64) - # Drop rows with NaN in predictors nan_mask = np.isnan(X).any(axis=1) if nan_mask.sum() > 0: logger.warning(" Dropping %s rows with NaN predictors", f"{nan_mask.sum():,}") @@ -470,7 +464,6 @@ def run_multinomial_regression( n_block_groups = reg_df.filter(~pl.any_horizontal(pl.col(predictors).is_null()))["block_group_geoid"].n_unique() - # Standardize or use raw units if standardize: logger.info(" Standardizing predictors...") scaler = StandardScaler() @@ -479,10 +472,8 @@ def run_multinomial_regression( logger.info(" Using raw predictor units (no standardization).") X_transformed = X - # Add intercept X_with_const = sm.add_constant(X_transformed) - # Expand rows by integer weights weight_ints = np.maximum(np.round(weights).astype(int), 1) X_expanded = np.repeat(X_with_const, weight_ints, axis=0) y_expanded = np.repeat(y, weight_ints) @@ -497,7 +488,6 @@ def run_multinomial_regression( model = sm.MNLogit(y_expanded, X_expanded) result = model.fit(method="newton", maxiter=100, disp=False) - # Extract results classes = sorted(np.unique(y).tolist()) baseline = classes[0] param_names = ["const", *predictors] @@ -514,7 +504,6 @@ def run_multinomial_regression( p_values[key] = {name: float(result.pvalues[j, i]) for j, name in enumerate(param_names)} odds_ratios[key] = {name: float(np.exp(result.params[j, i])) for j, name in enumerate(param_names)} - # Baseline cluster baseline_key = f"cluster_{baseline}" coefficients[baseline_key] = dict.fromkeys(param_names, 0.0) std_errors[baseline_key] = dict.fromkeys(param_names, 0.0) @@ -547,11 +536,6 @@ def run_multinomial_regression( } -# ============================================================================= -# 7. REPORT GENERATION -# ============================================================================= - - def generate_report( results: dict[str, object], cluster_distribution: pl.DataFrame, @@ -636,11 +620,6 @@ def generate_report( print("\n" + text) -# ============================================================================= -# 8. CLI -# ============================================================================= - - def main() -> int: parser = argparse.ArgumentParser( description="Stage 2: Block-group-level regression using household-day units.", @@ -696,18 +675,14 @@ def main() -> int: print("STAGE 2: BLOCK-GROUP REGRESSION (HOUSEHOLD-DAY UNITS)") print("=" * 70) - # 1. Load household-day cluster assignments (NO dominant cluster reduction) household_days, dominance_stats = load_cluster_assignments_household_day(args.clusters) - # 2. Load crosswalk and attach block groups zip_codes = household_days["zip_code"].unique().to_list() crosswalk = load_crosswalk(args.crosswalk, zip_codes) household_days_bg = attach_block_groups_to_household_days(household_days, crosswalk) - # 3. Aggregate to block-group x cluster counts bg_counts = aggregate_blockgroup_cluster_counts(household_days_bg) - # 4. Load Census and attach demographics census_df = fetch_or_load_census( cache_path=args.census_cache, state_fips=args.state_fips, @@ -716,9 +691,10 @@ def main() -> int: ) logger.info(" Census: %s block groups, %s columns", f"{len(census_df):,}", len(census_df.columns)) + census_df = create_derived_variables(census_df) + demo_df = attach_census_to_blockgroups(bg_counts, census_df) - # 5. Prepare regression dataset reg_df, predictors = prepare_regression_dataset( demo_df=demo_df, predictors=args.predictors, @@ -730,29 +706,24 @@ def main() -> int: logger.error("No data after filtering") return 1 - # Save regression dataset reg_df.write_parquet(args.output_dir / "regression_data_blockgroups.parquet") logger.info("Saved regression data to %s", args.output_dir / "regression_data_blockgroups.parquet") - # 6. Run regression (weighted by n_obs = household-day counts) results = run_multinomial_regression( reg_df=reg_df, predictors=predictors, outcome="cluster", - weight_col="n_obs", # household-day observations + weight_col="n_obs", standardize=args.standardize, ) - # Add dominance stats to results for reference results["dominance_stats"] = dominance_stats - # Save results model_summary = results.pop("model_summary") with open(args.output_dir / "regression_results_blockgroups.json", "w") as f: json.dump(results, f, indent=2) (args.output_dir / "statsmodels_summary.txt").write_text(model_summary) - # Generate report cluster_dist = ( reg_df.group_by("cluster") .agg(pl.col("n_obs").sum()) diff --git a/analysis/clustering/stage2_logratio_regression.py b/analysis/clustering/stage2_logratio_regression.py new file mode 100644 index 0000000..f8aac40 --- /dev/null +++ b/analysis/clustering/stage2_logratio_regression.py @@ -0,0 +1,934 @@ +#!/usr/bin/env python3 +""" +Stage 2: Block-Group-Level Log-Ratio Regression of Cluster Composition (HOUSEHOLD-DAY UNITS) + +Goal +----- +Model how Census block-group demographics are associated with the *composition* +of household-day observations across load-profile clusters, using log-ratio +regression (ALR / additive log-ratio). + +Unit of Analysis +---------------- +One row per Census BLOCK GROUP (not one row per block_group x cluster). + +Data Flow +--------- +1. Load household-day cluster assignments from Stage 1 (one row per household-day) +2. Join to Census block groups via ZIP+4 → block group crosswalk (1-to-1 enforced) +3. Aggregate to block-group-level cluster composition (wide format) +4. Join block groups to Census demographics +5. Create smoothed proportions and log-ratios vs a baseline cluster: + y_k = log(p_k / p_base) +6. Fit separate WLS regressions for each non-baseline cluster: + y_k ~ demographics with weights = total_obs (household-day count) +7. Fit OLS models (robustness check, unweighted) + +Outputs +------- +- regression_data_blockgroups_wide.parquet +- regression_results_logratio_blockgroups.json +- statsmodels_summaries_wls.txt +- statsmodels_summaries_ols.txt +- regression_report_logratio_blockgroups.txt + +Usage +----- + python stage2_logratio_regression.py \ + --clusters data/clustering/results/cluster_assignments.parquet \ + --crosswalk data/reference/2023_comed_zip4_census_crosswalk.txt \ + --census-cache data/reference/census_17_2023.parquet \ + --output-dir data/clustering/results/stage2_blockgroups_logratio \ + --baseline-cluster 1 +""" + +from __future__ import annotations + +import argparse +import json +import logging +from pathlib import Path + +import numpy as np +import polars as pl +import statsmodels.api as sm +from sklearn.preprocessing import StandardScaler + +from smart_meter_analysis.census import fetch_census_data + +logger = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) + +DEFAULT_PREDICTORS = [ + "Owner_Occupied_Pct", + "Average_Household_Size", + "Old_Building_Pct", + "Heat_Electric_Pct", + "Median_Household_Income", + "Urban_Percent", +] + + +def load_cluster_assignments_household_day(path: Path) -> tuple[pl.DataFrame, dict]: + """ + Load household-day cluster assignments. + + Returns the raw Stage 1 output: one row per (household, day) with cluster label. + + Returns + ------- + df : pl.DataFrame + One row per household-day with columns: + - account_identifier + - zip_code + - date (if present) + - cluster + + dominance_stats : dict + Summary statistics on how consistently households stay in one cluster + (for reporting/interpretation, not used in regression) + """ + logger.info("Loading cluster assignments from %s", path) + + # For large files, compute stats without loading everything + logger.info(" Computing statistics in streaming mode...") + lf = pl.scan_parquet(path) + + # Quick validation + schema = lf.collect_schema() + required = ["account_identifier", "zip_code", "cluster"] + missing = [c for c in required if c not in schema.names()] + if missing: + raise ValueError(f"cluster_assignments missing required columns: {missing}") + + # Compute basic stats without loading full data + stats_df = lf.select([ + pl.len().alias("n_household_days"), + pl.col("account_identifier").n_unique().alias("n_households"), + pl.col("cluster").n_unique().alias("n_clusters"), + ]).collect() + + stats = stats_df.to_dicts()[0] + + logger.info( + " Found: %s household-day observations, %s households, %s clusters", + f"{stats['n_household_days']:,}", + f"{stats['n_households']:,}", + stats["n_clusters"], + ) + + # Compute dominance stats with streaming + logger.info(" Computing dominance statistics...") + dominance_stats = _compute_dominance_stats_streaming(path) + + logger.info( + " Dominance stats: mean=%.1f%%, median=%.1f%%, >50%%: %.1f%% of households", + dominance_stats["dominance_mean"] * 100, + dominance_stats["dominance_median"] * 100, + dominance_stats["pct_above_50"], + ) + + # Now load only what we need for regression: just the columns + logger.info(" Loading data for regression (selecting needed columns only)...") + raw = lf.select(required).collect(streaming=True) + + return raw, dominance_stats + + +def _compute_dominance_stats_streaming(path: Path) -> dict: + """ + Compute dominance stats using streaming aggregation to avoid OOM. + """ + lf = pl.scan_parquet(path) + + # Compute per-household cluster counts + counts = ( + lf.group_by(["account_identifier", "cluster"]).agg(pl.len().alias("days_in_cluster")).collect(streaming=True) + ) + + # Get max days per household + max_days = counts.group_by("account_identifier").agg(pl.col("days_in_cluster").max().alias("max_days_in_cluster")) + + # Get total days per household + totals = counts.group_by("account_identifier").agg(pl.col("days_in_cluster").sum().alias("n_days")) + + # Join and compute dominance + dominance = max_days.join(totals, on="account_identifier") + dominance = dominance.with_columns((pl.col("max_days_in_cluster") / pl.col("n_days")).alias("dominance")) + + dom_series = dominance["dominance"] + + return { + "n_households": int(dominance.height), + "dominance_mean": float(dom_series.mean()), + "dominance_median": float(dom_series.median()), + "dominance_std": float(dom_series.std()), + "dominance_min": float(dom_series.min()), + "dominance_max": float(dom_series.max()), + "pct_above_50": float((dom_series > 0.5).mean() * 100), + "pct_above_67": float((dom_series > 0.67).mean() * 100), + "pct_above_80": float((dom_series > 0.80).mean() * 100), + } + + +def load_crosswalk_one_to_one(crosswalk_path: Path, zip_codes: list[str]) -> pl.DataFrame: + """ + Load ZIP+4 → Census block-group crosswalk with deterministic 1-to-1 mapping. + + When fan-out exists (ZIP+4 maps to multiple block groups), + choose smallest GEOID per ZIP+4 to avoid double-counting household-day observations. + + This is the only valid approach when crosswalk weights are unavailable. + """ + logger.info("Loading crosswalk from %s", crosswalk_path) + + crosswalk = ( + pl.scan_csv(crosswalk_path, separator="\t", infer_schema_length=10000) + .with_columns([ + (pl.col("Zip").cast(pl.Utf8).str.zfill(5) + "-" + pl.col("Zip4").cast(pl.Utf8).str.zfill(4)).alias( + "zip_code" + ), + pl.col("CensusKey2023").cast(pl.Utf8).str.zfill(15).str.slice(0, 12).alias("block_group_geoid"), + ]) + .filter(pl.col("zip_code").is_in(zip_codes)) + .select(["zip_code", "block_group_geoid"]) + .collect() + ) + + logger.info( + " Matched %s of %s ZIP+4 codes", + f"{crosswalk['zip_code'].n_unique():,}", + f"{len(set(zip_codes)):,}", + ) + + if crosswalk.is_empty(): + logger.warning(" Crosswalk is empty after filtering for sample ZIP+4s.") + return crosswalk + + # Check for fan-out + fanout = crosswalk.group_by("zip_code").agg(pl.n_unique("block_group_geoid").alias("n_block_groups")) + max_fanout = int(fanout["n_block_groups"].max()) + + if max_fanout > 1: + n_fanout = fanout.filter(pl.col("n_block_groups") > 1).height + pct_fanout = (n_fanout / len(fanout)) * 100 + + logger.warning( + " ZIP+4 fan-out detected: %s ZIP+4s (%.1f%%) map to multiple block groups (max=%d per ZIP+4)", + f"{n_fanout:,}", + pct_fanout, + max_fanout, + ) + logger.warning(" Applying deterministic 1-to-1 mapping: selecting smallest GEOID per ZIP+4") + logger.warning(" This prevents double-counting household-day observations") + + # Deterministic resolution: smallest GEOID per ZIP+4 + crosswalk = ( + crosswalk.sort(["zip_code", "block_group_geoid"]) + .group_by("zip_code") + .agg(pl.col("block_group_geoid").first()) + ) + + logger.info( + " After 1-to-1 resolution: %s ZIP+4 codes → %s unique mappings", + f"{len(crosswalk):,}", + f"{len(crosswalk):,}", + ) + else: + logger.info(" Crosswalk is already 1-to-1: each ZIP+4 maps to exactly one block group.") + + return crosswalk + + +def attach_block_groups_to_household_days( + household_days: pl.DataFrame, + crosswalk: pl.DataFrame, +) -> pl.DataFrame: + """ + Attach block-group GEOIDs to household-day observations via ZIP+4. + + Input: one row per household-day + Output: one row per household-day with block_group_geoid attached + """ + logger.info("Joining household-day observations to block groups...") + + df = household_days.join(crosswalk, on="zip_code", how="left") + + n_before = len(df) + missing = df.filter(pl.col("block_group_geoid").is_null()).height + + if missing > 0: + pct = missing / n_before * 100 + logger.warning(" %s (%.1f%%) observations missing block_group - dropping", f"{missing:,}", pct) + df = df.filter(pl.col("block_group_geoid").is_not_null()) + + logger.info( + " %s household-day observations across %s block groups", + f"{len(df):,}", + f"{df['block_group_geoid'].n_unique():,}", + ) + + return df + + +def aggregate_blockgroup_cluster_composition(df: pl.DataFrame) -> pl.DataFrame: + """ + Aggregate household-day observations to block-group-level cluster composition (wide). + + Output: one row per block_group_geoid with: + - total_obs + - total_households + - n_cluster_ + - p_cluster_ + """ + logger.info("Aggregating to block-group cluster composition (wide; household-day units)...") + + totals = df.group_by("block_group_geoid").agg([ + pl.len().alias("total_obs"), + pl.col("account_identifier").n_unique().alias("total_households"), + ]) + + counts_long = df.group_by(["block_group_geoid", "cluster"]).agg(pl.len().alias("n_obs")) + + counts_wide = ( + counts_long.with_columns(pl.col("cluster").cast(pl.Utf8)) + .pivot( + values="n_obs", + index="block_group_geoid", + columns="cluster", + aggregate_function="first", + ) + .fill_null(0) + ) + + cluster_cols = [c for c in counts_wide.columns if c != "block_group_geoid"] + counts_wide = counts_wide.rename({c: f"n_cluster_{c}" for c in cluster_cols}) + + out = totals.join(counts_wide, on="block_group_geoid", how="left").fill_null(0) + + n_cols = [c for c in out.columns if c.startswith("n_cluster_")] + out = out.with_columns([ + (pl.col(c) / pl.col("total_obs")).alias(c.replace("n_cluster_", "p_cluster_")) for c in n_cols + ]) + + logger.info( + " Created %s block-group rows; total obs=%s; total households=%s", + f"{len(out):,}", + f"{int(out['total_obs'].sum()):,}", + f"{int(out['total_households'].sum()):,}", + ) + return out + + +def fetch_or_load_census( + cache_path: Path, + state_fips: str = "17", + acs_year: int = 2023, + force_fetch: bool = False, +) -> pl.DataFrame: + """Fetch Census data from API or load from cache.""" + if cache_path.exists() and not force_fetch: + logger.info("Loading Census data from cache: %s", cache_path) + return pl.read_parquet(cache_path) + + logger.info("Fetching Census data from API (state=%s, year=%s)...", state_fips, acs_year) + + census_df = fetch_census_data(state_fips=state_fips, acs_year=acs_year) + + cache_path.parent.mkdir(parents=True, exist_ok=True) + census_df.write_parquet(cache_path) + logger.info(" Cached Census data to %s", cache_path) + + return census_df + + +def create_derived_variables(census_df: pl.DataFrame) -> pl.DataFrame: + """Create derived percentage variables from raw Census counts.""" + logger.info("Creating derived variables...") + + df = census_df.with_columns([ + (pl.col("Owner_Occupied") / pl.col("Occupied_Housing_Units") * 100).alias("Owner_Occupied_Pct"), + (pl.col("Heat_Electric") / pl.col("Total_Households") * 100).alias("Heat_Electric_Pct"), + ( + ( + pl.col("Built_1960_1969") + + pl.col("Built_1950_1959") + + pl.col("Built_1940_1949") + + pl.col("Built_1939_Earlier") + ) + / pl.col("Total_Housing_Units") + * 100 + ).alias("Old_Building_Pct"), + ]) + + df = df.with_columns([ + pl.when(pl.col("Owner_Occupied_Pct").is_nan()) + .then(None) + .otherwise(pl.col("Owner_Occupied_Pct")) + .alias("Owner_Occupied_Pct"), + pl.when(pl.col("Heat_Electric_Pct").is_nan()) + .then(None) + .otherwise(pl.col("Heat_Electric_Pct")) + .alias("Heat_Electric_Pct"), + pl.when(pl.col("Old_Building_Pct").is_nan()) + .then(None) + .otherwise(pl.col("Old_Building_Pct")) + .alias("Old_Building_Pct"), + ]) + + return df + + +def attach_census_to_blockgroups(bg_comp: pl.DataFrame, census_df: pl.DataFrame) -> pl.DataFrame: + """Attach Census demographics to block-group composition (wide).""" + logger.info("Joining Census data to block-group composition...") + + census_df = census_df.with_columns(pl.col("GEOID").cast(pl.Utf8).str.zfill(12).alias("block_group_geoid")) + + demo = bg_comp.join(census_df, on="block_group_geoid", how="left") + + n_before = len(demo) + missing = demo.filter(pl.col("GEOID").is_null()).height + + if missing > 0: + pct = missing / n_before * 100 + logger.warning(" %s (%.1f%%) rows missing Census data - dropping", f"{missing:,}", pct) + demo = demo.filter(pl.col("GEOID").is_not_null()) + + logger.info(" Demographics attached for %s block groups", f"{demo['block_group_geoid'].n_unique():,}") + + return demo + + +def prepare_regression_dataset_wide( + demo_df: pl.DataFrame, + predictors: list[str], + min_obs_per_bg: int = 50, +) -> tuple[pl.DataFrame, list[str]]: + """ + Prepare block-group (wide) dataset for log-ratio regression. + + Filters: + - Block groups with fewer than min_obs_per_bg household-day observations + - Drops predictors with too many nulls + - Drops rows with any null predictor values (conservative / statsmodels-friendly) + """ + logger.info("Preparing regression dataset (wide)...") + + df = demo_df.filter(pl.col("total_obs") >= min_obs_per_bg) + logger.info( + " After min_obs filter (>=%d): %s block groups", + min_obs_per_bg, + f"{df['block_group_geoid'].n_unique():,}", + ) + + available_predictors: list[str] = [] + for p in predictors: + if p not in df.columns: + logger.warning(" Predictor not found: %s", p) + continue + null_rate = df[p].null_count() / len(df) + if null_rate > 0.5: + logger.warning(" Predictor %s has %.0f%% nulls - excluding", p, null_rate * 100) + continue + available_predictors.append(p) + + if not available_predictors: + raise ValueError("No valid predictors available") + + # Drop rows with any null predictor values + df = df.filter(~pl.any_horizontal(pl.col(available_predictors).is_null())) + logger.info( + " After dropping rows with null predictors: %s block groups", + f"{df['block_group_geoid'].n_unique():,}", + ) + + logger.info(" Using %d predictors: %s", len(available_predictors), available_predictors) + + return df, available_predictors + + +def choose_baseline_cluster_from_household_days(household_days: pl.DataFrame) -> str: + """ + Choose baseline cluster as the most frequent cluster by household-day observations. + Returns as string (to match pivot-derived cluster column suffixes). + """ + dist = household_days.group_by("cluster").agg(pl.len().alias("n")).sort("n", descending=True) + baseline = dist["cluster"][0] + logger.info(" Auto-selected baseline: cluster %s (most frequent by household-days)", baseline) + return str(baseline) + + +def add_smoothed_proportions_and_logratios( + df: pl.DataFrame, + baseline_cluster: str, + alpha: float = 0.5, +) -> tuple[pl.DataFrame, list[str], list[str]]: + """ + Add smoothed proportions and log-ratios vs baseline. + + Smoothing is applied at the count level: + n_s_k = n_k + alpha + total_s = total_obs + alpha*K + p_s_k = n_s_k / total_s + + Then outcomes: + log_ratio_k = log(p_s_k / p_s_base) + """ + n_cols = sorted([c for c in df.columns if c.startswith("n_cluster_")]) + if not n_cols: + raise ValueError("No n_cluster_ columns found. Did you run wide aggregation?") + + clusters = [c.replace("n_cluster_", "") for c in n_cols] + if baseline_cluster not in clusters: + raise ValueError(f"Baseline cluster {baseline_cluster} not found in clusters={clusters}") + + K = len(clusters) + nonbase = [k for k in clusters if k != baseline_cluster] + + logger.info("Adding smoothed proportions and log-ratios (alpha=%.2f)...", alpha) + logger.info(" Clusters: %s (K=%d)", clusters, K) + logger.info(" Baseline: %s", baseline_cluster) + logger.info(" Non-baseline: %s", nonbase) + + df2 = df.with_columns([(pl.col(f"n_cluster_{k}") + alpha).alias(f"n_s_{k}") for k in clusters]).with_columns( + (pl.col("total_obs") + alpha * K).alias("total_obs_s") + ) + + df2 = df2.with_columns([(pl.col(f"n_s_{k}") / pl.col("total_obs_s")).alias(f"p_s_{k}") for k in clusters]) + + df2 = df2.with_columns([ + (pl.col(f"p_s_{k}") / pl.col(f"p_s_{baseline_cluster}")).log().alias(f"log_ratio_{k}") for k in nonbase + ]) + + # Diagnostic: check for extreme log-ratios + for k in nonbase: + extreme_pos = df2.filter(pl.col(f"log_ratio_{k}") > 5).height + extreme_neg = df2.filter(pl.col(f"log_ratio_{k}") < -5).height + + if extreme_pos > 0 or extreme_neg > 0: + logger.warning( + " Cluster %s: %d block groups with log_ratio > 5, %d with log_ratio < -5", + k, + extreme_pos, + extreme_neg, + ) + logger.warning(" This suggests very imbalanced cluster distributions in some block groups") + + return df2, clusters, nonbase + + +def run_logratio_regressions( + reg_df: pl.DataFrame, + predictors: list[str], + baseline_cluster: str, + weight_col: str = "total_obs", + standardize: bool = False, + include_ols: bool = True, +) -> dict[str, object]: + """ + Fit separate WLS models for each non-baseline cluster: + log(p_k / p_base) ~ predictors + with weights = total_obs (household-day count in block group). + + Also fits OLS models (unweighted) as robustness check. + + Interpretation: + exp(beta) = multiplicative effect on the proportion ratio p_k/p_base + for a 1-unit increase in the predictor. + """ + logger.info("Running log-ratio regressions...") + logger.info(" Baseline cluster: %s", baseline_cluster) + logger.info(" Weighting by: %s", weight_col) + logger.info(" OLS robustness check: %s", include_ols) + + logratio_cols = sorted([c for c in reg_df.columns if c.startswith("log_ratio_")]) + if not logratio_cols: + raise ValueError("No log_ratio_ columns found. Did you call add_smoothed_proportions_and_logratios()?") + + X = reg_df.select(predictors).to_numpy().astype(np.float64) + w = reg_df.get_column(weight_col).to_numpy().astype(np.float64) + + # Drop invalid rows (NaNs / infs / nonpositive weights) + valid = np.isfinite(X).all(axis=1) & np.isfinite(w) & (w > 0) + if valid.sum() == 0: + raise ValueError("No valid rows after filtering missing predictors / invalid weights.") + + X = X[valid] + w = w[valid] + + scaler = None + if standardize: + logger.info(" Standardizing predictors...") + scaler = StandardScaler() + X = scaler.fit_transform(X) + else: + logger.info(" Using raw predictor units (no standardization).") + + X = sm.add_constant(X) + param_names = ["const", *predictors] + + results: dict[str, object] = { + "n_block_groups": int(reg_df["block_group_geoid"].n_unique()), + "n_rows": len(reg_df), + "n_valid_rows": int(valid.sum()), + "weight_col": weight_col, + "baseline_cluster": baseline_cluster, + "predictors": predictors, + "standardize": bool(standardize), + "models_wls": {}, + "models_ols": {}, + } + + if scaler is not None: + results["standardization"] = { + "type": "zscore", + "means": {p: float(m) for p, m in zip(predictors, scaler.mean_)}, + "scales": {p: float(s) for p, s in zip(predictors, scaler.scale_)}, + } + + summaries_wls = [] + summaries_ols = [] + + for col in logratio_cols: + k = col.replace("log_ratio_", "") + y = reg_df.get_column(col).to_numpy().astype(np.float64)[valid] + + if not np.isfinite(y).all(): + raise ValueError(f"Non-finite values in outcome {col}. Check smoothing / inputs.") + + # WLS model + model_wls = sm.WLS(y, X, weights=w) + fit_wls = model_wls.fit() + + coef_wls = {name: float(fit_wls.params[i]) for i, name in enumerate(param_names)} + se_wls = {name: float(fit_wls.bse[i]) for i, name in enumerate(param_names)} + pvals_wls = {name: float(fit_wls.pvalues[i]) for i, name in enumerate(param_names)} + mult_wls = {name: float(np.exp(fit_wls.params[i])) for i, name in enumerate(param_names)} + + key = f"cluster_{k}_vs_{baseline_cluster}" + results["models_wls"][key] = { + "outcome": f"log(p_{k}/p_{baseline_cluster})", + "nobs": int(fit_wls.nobs), + "r2": float(fit_wls.rsquared), + "adj_r2": float(fit_wls.rsquared_adj), + "coefficients": coef_wls, + "std_errors": se_wls, + "p_values": pvals_wls, + "multiplicative_effects": mult_wls, + } + + summaries_wls.append(f"\n{'=' * 80}\nWLS: {key}\n{'=' * 80}\n{fit_wls.summary().as_text()}") + logger.info(" WLS %s: R²=%.4f", key, float(fit_wls.rsquared)) + + # OLS model (robustness check) + if include_ols: + model_ols = sm.OLS(y, X) + fit_ols = model_ols.fit() + + coef_ols = {name: float(fit_ols.params[i]) for i, name in enumerate(param_names)} + se_ols = {name: float(fit_ols.bse[i]) for i, name in enumerate(param_names)} + pvals_ols = {name: float(fit_ols.pvalues[i]) for i, name in enumerate(param_names)} + mult_ols = {name: float(np.exp(fit_ols.params[i])) for i, name in enumerate(param_names)} + + results["models_ols"][key] = { + "outcome": f"log(p_{k}/p_{baseline_cluster})", + "nobs": int(fit_ols.nobs), + "r2": float(fit_ols.rsquared), + "adj_r2": float(fit_ols.rsquared_adj), + "coefficients": coef_ols, + "std_errors": se_ols, + "p_values": pvals_ols, + "multiplicative_effects": mult_ols, + } + + summaries_ols.append(f"\n{'=' * 80}\nOLS: {key}\n{'=' * 80}\n{fit_ols.summary().as_text()}") + logger.info(" OLS %s: R²=%.4f", key, float(fit_ols.rsquared)) + + results["all_model_summaries_wls"] = "\n".join(summaries_wls) + if include_ols: + results["all_model_summaries_ols"] = "\n".join(summaries_ols) + + return results + + +def generate_report_logratio( + results: dict[str, object], + dominance_stats: dict, + cluster_dist: pl.DataFrame, + output_path: Path, +) -> None: + """Generate a human-readable report emphasizing log-ratio interpretation.""" + lines = [ + "=" * 80, + "STAGE 2: BLOCK-GROUP LOG-RATIO REGRESSION RESULTS", + "=" * 80, + "", + "ANALYSIS UNIT: BLOCK GROUPS (HOUSEHOLD-DAY COMPOSITION)", + "-" * 80, + "Each row is a block group, with household-day counts aggregated into", + "cluster composition proportions. Outcomes are log-ratios vs a baseline:", + " y_k = log(p_k / p_baseline)", + "", + "Models are separate WLS regressions per non-baseline cluster, weighted by total_obs.", + "OLS models (unweighted) are included as robustness checks.", + "", + "MODEL OVERVIEW", + "-" * 80, + f"Block groups (total): {results['n_block_groups']:,}", + f"Block groups (valid): {results['n_valid_rows']:,}", + f"Rows: {results['n_rows']:,}", + f"Predictors: {len(results['predictors'])}", + f"Weight column: {results['weight_col']}", + f"Baseline cluster: {results['baseline_cluster']}", + f"Standardized predictors: {results.get('standardize', False)}", + "", + "HOUSEHOLD CLUSTER CONSISTENCY (interpretation context)", + "-" * 80, + "How consistently do households stay in one cluster across sampled days?", + "(This doesn't affect the regression - just useful context.)", + "", + f" Households: {dominance_stats['n_households']:,}", + f" Mean dominance: {dominance_stats['dominance_mean'] * 100:.1f}%", + f" Median dominance: {dominance_stats['dominance_median'] * 100:.1f}%", + f" Households >50% in one cluster: {dominance_stats['pct_above_50']:.1f}%", + f" Households >67% in one cluster: {dominance_stats['pct_above_67']:.1f}%", + f" Households >80% in one cluster: {dominance_stats['pct_above_80']:.1f}%", + "", + "CLUSTER DISTRIBUTION (by household-day observations, overall)", + "-" * 80, + ] + + for row in cluster_dist.iter_rows(named=True): + lines.append(f" Cluster {row['cluster']}: {row['n_obs']:,} obs ({row['pct']:.1f}%)") + + lines.extend([ + "", + "TOP PREDICTORS BY MODEL (WLS; by |coef|; *=p<0.05)", + "-" * 80, + "Interpretation: exp(coef) multiplies the proportion ratio p_k/p_baseline", + "for a 1-unit increase in the predictor (holding others constant).", + "", + ]) + + models_wls = results["models_wls"] + models_ols = results.get("models_ols", {}) + predictors = results["predictors"] + + for model_key in sorted(models_wls.keys()): + m_wls = models_wls[model_key] + coefs_wls = m_wls["coefficients"] + pvals_wls = m_wls["p_values"] + mult_wls = m_wls["multiplicative_effects"] + + lines.append(f"\n{model_key}") + lines.append("-" * 80) + lines.append(f"WLS R²={m_wls['r2']:.4f}, Adj R²={m_wls['adj_r2']:.4f}") + + if model_key in models_ols: + m_ols = models_ols[model_key] + lines.append(f"OLS R²={m_ols['r2']:.4f}, Adj R²={m_ols['adj_r2']:.4f} (robustness check)") + + lines.append("") + + sorted_preds = sorted( + [(p, coefs_wls[p]) for p in predictors], + key=lambda x: abs(x[1]), + reverse=True, + )[:5] + + for pred, coef in sorted_preds: + star = "*" if pvals_wls[pred] < 0.05 else "" + direction = "↑" if coef > 0 else "↓" + + line = f" {direction} {pred:<30} WLS: mult={mult_wls[pred]:.3f}, coef={coef:>7.4f}, p={pvals_wls[pred]:.3g}{star}" + + # Show OLS comparison if available + if model_key in models_ols: + coef_ols = models_ols[model_key]["coefficients"][pred] + diff = coef - coef_ols + line += f" | OLS: coef={coef_ols:>7.4f} (Δ={diff:>6.3f})" + + lines.append(line) + + lines.append("\n" + "=" * 80) + lines.append("") + lines.append("NOTES:") + lines.append("- WLS models weight by total household-day observations per block group") + lines.append("- OLS models are unweighted (robustness check)") + lines.append("- Large WLS-OLS differences suggest results driven by large block groups") + lines.append("- Multiplicative effect > 1.0 means predictor increases p_k/p_baseline") + lines.append("- Multiplicative effect < 1.0 means predictor decreases p_k/p_baseline") + lines.append("=" * 80) + + text = "\n".join(lines) + output_path.write_text(text, encoding="utf-8") + logger.info("Report saved to %s", output_path) + print("\n" + text) + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Stage 2: Block-group-level log-ratio regression using household-day units.", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + parser.add_argument("--clusters", type=Path, required=True, help="cluster_assignments.parquet") + parser.add_argument("--crosswalk", type=Path, required=True, help="ZIP+4 → block-group crosswalk") + parser.add_argument( + "--census-cache", + type=Path, + default=Path("data/reference/census_17_2023.parquet"), + ) + parser.add_argument("--fetch-census", action="store_true", help="Force re-fetch Census data") + parser.add_argument("--state-fips", default="17") + parser.add_argument("--acs-year", type=int, default=2023) + parser.add_argument( + "--min-obs-per-bg", + type=int, + default=50, + help="Minimum household-day observations per block group (default: 50)", + ) + parser.add_argument("--predictors", nargs="+", default=DEFAULT_PREDICTORS, help="Predictor columns") + parser.add_argument( + "--output-dir", + type=Path, + default=Path("data/clustering/results/stage2_blockgroups_logratio"), + ) + parser.add_argument( + "--standardize", + action="store_true", + help="Standardize predictors before regression (default: use raw units).", + ) + parser.add_argument( + "--alpha", + type=float, + default=0.5, + help="Pseudocount smoothing parameter for proportions (default: 0.5)", + ) + parser.add_argument( + "--baseline-cluster", + type=str, + default=None, + help="Baseline cluster label (default: most frequent cluster by household-day observations)", + ) + parser.add_argument( + "--no-ols", + action="store_true", + help="Skip OLS robustness check (only run WLS)", + ) + + args = parser.parse_args() + + if not args.clusters.exists(): + logger.error("Cluster assignments not found: %s", args.clusters) + return 1 + if not args.crosswalk.exists(): + logger.error("Crosswalk not found: %s", args.crosswalk) + return 1 + + args.output_dir.mkdir(parents=True, exist_ok=True) + + print("=" * 80) + print("STAGE 2: BLOCK-GROUP LOG-RATIO REGRESSION (HOUSEHOLD-DAY UNITS)") + print("=" * 80) + + household_days, dominance_stats = load_cluster_assignments_household_day(args.clusters) + + # Baseline cluster (string form to match wide column naming) + baseline_cluster = args.baseline_cluster or choose_baseline_cluster_from_household_days(household_days) + baseline_cluster = str(baseline_cluster) + logger.info("Using baseline cluster: %s", baseline_cluster) + + zip_codes = household_days["zip_code"].unique().to_list() + crosswalk = load_crosswalk_one_to_one(args.crosswalk, zip_codes) + household_days_bg = attach_block_groups_to_household_days(household_days, crosswalk) + + bg_comp = aggregate_blockgroup_cluster_composition(household_days_bg) + + census_df = fetch_or_load_census( + cache_path=args.census_cache, + state_fips=args.state_fips, + acs_year=args.acs_year, + force_fetch=args.fetch_census, + ) + logger.info(" Census: %s block groups, %s columns", f"{len(census_df):,}", len(census_df.columns)) + + census_df = create_derived_variables(census_df) + + demo_df = attach_census_to_blockgroups(bg_comp, census_df) + + reg_df, predictors = prepare_regression_dataset_wide( + demo_df=demo_df, + predictors=args.predictors, + min_obs_per_bg=args.min_obs_per_bg, + ) + + if reg_df.is_empty(): + logger.error("No data after filtering") + return 1 + + # Add smoothed proportions + log-ratios + reg_df2, clusters, nonbase = add_smoothed_proportions_and_logratios( + reg_df, + baseline_cluster=baseline_cluster, + alpha=args.alpha, + ) + logger.info("Clusters detected: %s (baseline=%s, non-baseline=%s)", clusters, baseline_cluster, nonbase) + + # Save regression dataset + reg_df2.write_parquet(args.output_dir / "regression_data_blockgroups_wide.parquet") + logger.info("Saved regression data to %s", args.output_dir / "regression_data_blockgroups_wide.parquet") + + # Fit models + results = run_logratio_regressions( + reg_df=reg_df2, + predictors=predictors, + baseline_cluster=baseline_cluster, + weight_col="total_obs", + standardize=args.standardize, + include_ols=not args.no_ols, + ) + + results["dominance_stats"] = dominance_stats + results["alpha"] = float(args.alpha) + results["k"] = len(clusters) + results["clusters"] = clusters + results["nonbaseline_clusters"] = nonbase + + # Write outputs + all_summaries_wls = results.pop("all_model_summaries_wls") + all_summaries_ols = results.pop("all_model_summaries_ols", None) + + with open(args.output_dir / "regression_results_logratio_blockgroups.json", "w") as f: + json.dump(results, f, indent=2) + + (args.output_dir / "statsmodels_summaries_wls.txt").write_text(all_summaries_wls, encoding="utf-8") + + if all_summaries_ols: + (args.output_dir / "statsmodels_summaries_ols.txt").write_text(all_summaries_ols, encoding="utf-8") + + # Cluster distribution overall (by household-day observations) + cluster_dist = ( + household_days.group_by("cluster") + .agg(pl.len().alias("n_obs")) + .sort("cluster") + .with_columns((pl.col("n_obs") / pl.col("n_obs").sum() * 100).alias("pct")) + ) + + generate_report_logratio( + results=results, + dominance_stats=dominance_stats, + cluster_dist=cluster_dist, + output_path=args.output_dir / "regression_report_logratio_blockgroups.txt", + ) + + print(f"\nOutputs saved to: {args.output_dir}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/docs/index.qmd b/docs/index.qmd index 5ca3db2..8307177 100644 --- a/docs/index.qmd +++ b/docs/index.qmd @@ -4,26 +4,30 @@ ### Load profiles -Our ComEd and Ameren Illinois data consists of kWh load profiles in 30-minute increments for the complete set of households across the entire Chicago metro area. To preserve anonymity, TK. +Our ComEd and Ameren Illinois data consists of kWh load profiles in 30-minute increments for the complete set of households across the entire ComEd service area. To preserve anonymity per rules set by the Illinois Chamber of Commerce, customer data can only be included in a data release from a utility company if it passes a screening process. In this case, a customer’s individual data cannot be released if there are 15 or fewer customers in the given geographic area, or if they represent more than 15% of that area’s load. In our case, the geographic area of interest is the nine-digit Zip+4 postal code. -As a result, we only get consistent household identifiers for a given calendar month. Since Chicago's grid is summer-peaking, we choose load profiles for the month of TK. We denote load profiles as $L_i$ (for each $i$ household). Each $L_i$ is a TK point time series, for every 30 minutes in the month. +# The Ameren data was grabbed for a hypothetical second project with CUB. It's not part of the scope of this round of work--CUB explicitly asked for the ComEd data first and separately -### Demographic information +Furthermore, even when individual customers’ usage data is provided, the identification number associated with each customer is not retained month to month. In other words, while the usage data may feature the same customers in September as they did in August, ComEd assigns each customer a new account ID for each month. As a result, we only get consistent household identifiers for a given calendar month. Since Illinois’s grid peaked in July 2023, we choose load profiles for the month of TK. We denote load profiles as $L_i$ (for each $i$ household). Each $L_i$ is a TK point time series, for every 30 minutes in the month (48 half-hour readings over the course of 30 days). -The highest spatial resolution demographic data available for our analysis is the 5-year TK (2024?) US Census Bureau American Community Survey (2024 ACS) at the block group level. From this, we derive a number of block-group level demographic features: +### Demographic information +The highest spatial resolution demographic data available for our analysis is the 5-year 2023 US Census Bureau American Community Survey (2023 ACS) and the decennial 2020 Census Supplemental Demographic and Housing Characteristics File (2020 DHS) at the block group level (DHS 2020). From this, we derive a number of block-group level demographic features: * TK ### Geographical crosswalk -Load profiles are identified by ZIP+4. Our demographic information from the TK (2024?) ACS is available at the Census block group level. To join the two, we use a crosswalk from Melissa. TK details of crosswalk. +Load profiles are identified by ZIP+4. Our demographic information from the 2023ACS and 2020 DHS is available at the Census block group level. To join the two, we use a crosswalk from Melissa. That crosswalk matches every Zip+4 postal code in Illinois to the Census Block that it was associated with in 2023. From here we aggregate the Census Blocks to their Block Groups–allowing them to be associated with our demographic information. Thus, we are able to characterize Block Groups both demographically and by the usage data of their residents. -TK ZIP+4 and Census Block group spaital overlap to show that it's a reasonable mapping. +TK ZIP+4 and Census Block group spatial overlap to show that it’s a reasonable mapping. ### Clusters We cluster the load profiles into clusters via k-means. We use Euclidean distance on normalized load profiles, to focus on load shape rather than overall levels. We are aiming for a small number of clusters (4-10) to aid in interpretation. We chose $k=\textrm{TK}$ via the gap statistic [@tibshiraniEstimatingNumberClusters2001], using the `GapStatistics` package(TK citation). +# current code compares silhouette scores to give the k gives the tightest, best separated clusters. Given skepticism about the initial paper's findings +# (as in... maybe there's nothing to see here?) your approach might be better. My limited understanding is that the gap score compares to a null model, # while my code only compares clusters that it generates itself. Happy to change + TK details on each cluster, and brief description of each shape. TK selection of cluster 1 as baseline. diff --git a/scripts/process_csvs_batched_optimized.py b/scripts/process_csvs_batched_optimized.py new file mode 100644 index 0000000..6e589f7 --- /dev/null +++ b/scripts/process_csvs_batched_optimized.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +""" +Memory-optimized CSV processing for large file counts. + +Processes CSV files in batches and sub-batches to avoid OOM / huge lazy plans. + +Usage: + python process_csvs_batched_optimized.py \ + --input-dir data/validation_runs/202308_50k/samples \ + --output data/validation_runs/202308_50k/processed_combined.parquet \ + --batch-size 5000 \ + --sub-batch-size 250 +""" + +import argparse +import logging +from pathlib import Path + +import polars as pl + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + + +def process_csv_subbatch_to_parquet( + csv_files: list[Path], + batch_num: int, + sub_num: int, + temp_dir: Path, +) -> Path: + """ + Process a sub-batch of CSV files and write to a temporary parquet file. + """ + from smart_meter_analysis.aws_loader import ( + COMED_SCHEMA, + add_time_columns_lazy, + transform_wide_to_long_lazy, + ) + + logger.info(" Sub-batch %d.%d: %d files", batch_num, sub_num, len(csv_files)) + + lazy_frames: list[pl.LazyFrame] = [] + for i, csv_path in enumerate(csv_files, 1): + if i % 200 == 0: + logger.info(" Scanned %d/%d files in sub-batch %d.%d", i, len(csv_files), batch_num, sub_num) + try: + lf = pl.scan_csv( + str(csv_path), + schema_overrides=COMED_SCHEMA, + ignore_errors=True, + ) + lf = transform_wide_to_long_lazy(lf) + + # IMPORTANT: updated signature (no day_mode) + lf = add_time_columns_lazy(lf) + + lazy_frames.append(lf) + except Exception as exc: + logger.warning("Failed to scan %s: %s", csv_path.name, exc) + + if not lazy_frames: + raise ValueError(f"No files successfully scanned in sub-batch {batch_num}.{sub_num}") + + sub_output = temp_dir / f"batch_{batch_num:04d}_sub_{sub_num:04d}.parquet" + + # Combine this sub-batch and write immediately + pl.concat(lazy_frames, how="diagonal_relaxed").sink_parquet(sub_output) + + logger.info(" Sub-batch %d.%d complete: %s", batch_num, sub_num, sub_output) + return sub_output + + +def process_csv_batch_to_parquet( + csv_files: list[Path], + batch_num: int, + temp_dir: Path, + sub_batch_size: int, +) -> Path: + """ + Process a batch of CSV files by splitting into sub-batches and writing a single + batch parquet composed from the sub-batch parquets. + """ + logger.info("Processing batch %d: %d files", batch_num, len(csv_files)) + + sub_files: list[Path] = [] + for sub_num, i in enumerate(range(0, len(csv_files), sub_batch_size), 1): + sub = csv_files[i : i + sub_batch_size] + sub_file = process_csv_subbatch_to_parquet(sub, batch_num, sub_num, temp_dir) + sub_files.append(sub_file) + + batch_output = temp_dir / f"batch_{batch_num:04d}.parquet" + logger.info(" Concatenating %d sub-batches into %s", len(sub_files), batch_output) + + pl.concat([pl.scan_parquet(str(f)) for f in sub_files], how="diagonal_relaxed").sink_parquet(batch_output) + + # Clean up sub-batch files + for f in sub_files: + f.unlink(missing_ok=True) + + logger.info("Batch %d complete: %s", batch_num, batch_output) + return batch_output + + +def main() -> int: + parser = argparse.ArgumentParser(description="Process CSV files in memory-safe batches") + parser.add_argument("--input-dir", type=Path, required=True, help="Directory containing CSV files") + parser.add_argument("--output", type=Path, required=True, help="Output parquet file path") + parser.add_argument("--batch-size", type=int, default=5000, help="Files per batch (default: 5000)") + parser.add_argument( + "--sub-batch-size", + type=int, + default=250, + help="Files per sub-batch within each batch (default: 250).", + ) + + args = parser.parse_args() + + csv_files = sorted(args.input_dir.glob("*.csv")) + logger.info("Found %d CSV files", len(csv_files)) + + if not csv_files: + logger.error("No CSV files found in %s", args.input_dir) + return 1 + + temp_dir = args.output.parent / "temp_batches" + temp_dir.mkdir(parents=True, exist_ok=True) + + batch_files: list[Path] = [] + for batch_num, i in enumerate(range(0, len(csv_files), args.batch_size), 1): + batch = csv_files[i : i + args.batch_size] + batch_file = process_csv_batch_to_parquet( + csv_files=batch, + batch_num=batch_num, + temp_dir=temp_dir, + sub_batch_size=args.sub_batch_size, + ) + batch_files.append(batch_file) + + logger.info("Concatenating %d batch files into final output...", len(batch_files)) + args.output.parent.mkdir(parents=True, exist_ok=True) + + pl.concat([pl.scan_parquet(str(f)) for f in batch_files], how="diagonal_relaxed").sink_parquet(args.output) + + row_count = pl.scan_parquet(args.output).select(pl.len()).collect()[0, 0] + logger.info("Success! Wrote %s records to %s", f"{row_count:,}", args.output) + + logger.info("Cleaning up temporary batch files...") + for f in batch_files: + f.unlink(missing_ok=True) + temp_dir.rmdir() + + logger.info("Done!") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/smart_meter_analysis/aws_loader.py b/smart_meter_analysis/aws_loader.py index 38351dc..0c7ea36 100644 --- a/smart_meter_analysis/aws_loader.py +++ b/smart_meter_analysis/aws_loader.py @@ -173,7 +173,6 @@ def add_time_columns_lazy(lf: pl.LazyFrame, day_mode: str = "calendar") -> pl.La if day_mode not in {"calendar", "billing"}: raise ValueError("day_mode must be 'calendar' or 'billing'") - # Fixed DST dates currently encoded DST_SPRING_2023 = _date(2023, 3, 12) DST_FALL_2023 = _date(2023, 11, 5) @@ -182,21 +181,22 @@ def add_time_columns_lazy(lf: pl.LazyFrame, day_mode: str = "calendar") -> pl.La if day_mode == "calendar": date_expr = dt.dt.date() else: - # Billing day: assign midnight rows (00:00) back to the previous date date_expr = ( pl.when((dt.dt.hour() == 0) & (dt.dt.minute() == 0)) .then((dt - pl.duration(days=1)).dt.date()) .otherwise(dt.dt.date()) ) + weekday_expr = pl.col("date").dt.weekday() # Polars: Mon=1 ... Sun=7 + return ( lf.with_columns([ date_expr.alias("date"), dt.dt.hour().alias("hour"), ]) .with_columns([ - pl.col("date").dt.weekday().alias("weekday"), - (pl.col("date").dt.weekday() >= 5).alias("is_weekend"), + weekday_expr.alias("weekday"), + (weekday_expr >= 6).alias("is_weekend"), ]) .with_columns([ (pl.col("date") == DST_SPRING_2023).alias("is_spring_forward_day"), diff --git a/smart_meter_analysis/manifests.py b/smart_meter_analysis/manifests.py index fd6da8d..ef75316 100644 --- a/smart_meter_analysis/manifests.py +++ b/smart_meter_analysis/manifests.py @@ -136,11 +136,11 @@ def ensure_date_manifest(input_path: Path) -> Path: """ Create or load date manifest in a memory-safe way. - The manifest contains one row per date with representative weekend/weekday - flags. This is small in practice (~31 rows for one month), but we still - build it via streaming group_by for consistency. + The manifest contains one row per date with weekend/weekday + flags derived from weekday (source of truth). """ - _validate_input_has_columns(input_path, ["date", "is_weekend", "weekday"]) + # Only require date + weekday; is_weekend will be derived + _validate_input_has_columns(input_path, ["date", "weekday"]) manifest_path = input_path.parent / f"{input_path.stem}_dates.parquet" @@ -161,20 +161,22 @@ def ensure_date_manifest(input_path: Path) -> Path: manifest_path, ) - # Build manifest using streaming-friendly group_by logger.info("Building date manifest from %s (streaming group_by)...", input_path) log_memory("before date manifest") lf = pl.scan_parquet(input_path) + # Build: date -> weekday (first) -> is_weekend (derived) dates_df = ( - lf.select(["date", "is_weekend", "weekday"]) + lf.select(["date", "weekday"]) .filter(pl.col("date").is_not_null()) .group_by("date") .agg( - pl.first("is_weekend").alias("is_weekend"), pl.first("weekday").alias("weekday"), ) + .with_columns( + (pl.col("weekday") >= 6).alias("is_weekend") # 6=Sat, 7=Sun + ) .collect(engine="streaming") .sort("date") ) diff --git a/tests/diagnose_bg_density_v2.py b/tests/diagnose_bg_density_v2.py new file mode 100644 index 0000000..b36d397 --- /dev/null +++ b/tests/diagnose_bg_density_v2.py @@ -0,0 +1,360 @@ +#!/usr/bin/env python3 +""" +Diagnostic: Check block-group sampling density for Stage 2 regression. + +This script analyzes whether your household sample is dense enough at the +block-group level to detect meaningful demographic patterns. + +Usage: + python diagnose_bg_density.py \ + --clusters data/clustering/results/cluster_assignments.parquet \ + --crosswalk data/reference/2023_comed_zip4_census_crosswalk.txt \ + --output diagnostics.txt +""" + +from __future__ import annotations + +import argparse +from pathlib import Path + +import polars as pl + + +def inspect_cluster_file(clusters_path: Path) -> None: + """Inspect what's actually in the cluster assignments file.""" + print("=" * 70) + print("CLUSTER FILE INSPECTION") + print("=" * 70) + + df = pl.read_parquet(clusters_path) + + print(f"\nFile: {clusters_path}") + print(f"Rows: {len(df):,}") + print(f"Columns: {df.columns}") + print("\nSchema:") + for col, dtype in df.schema.items(): + print(f" {col:30s} {dtype}") + + print("\nFirst few rows:") + print(df.head()) + + print("\n" + "=" * 70) + + +def load_and_join_to_blockgroups( + clusters_path: Path, + crosswalk_path: Path, +) -> pl.DataFrame: + """Join cluster assignments to block groups.""" + print("\nLoading cluster assignments...") + clusters = pl.read_parquet(clusters_path) + print(f" {len(clusters):,} household-day observations") + + # Check which ID columns are present + id_col = None + if "account_identifier" in clusters.columns: + id_col = "account_identifier" + elif "household_id" in clusters.columns: + id_col = "household_id" + elif "meter_id" in clusters.columns: + id_col = "meter_id" + else: + print(" ⚠️ WARNING: No household identifier column found!") + print(f" Available columns: {clusters.columns}") + id_col = None + + if id_col: + print(f" {clusters[id_col].n_unique():,} unique households (using column: {id_col})") + + if "zip_code" not in clusters.columns: + raise ValueError(f"'zip_code' column not found in {clusters_path}") + + if "cluster" not in clusters.columns: + raise ValueError(f"'cluster' column not found in {clusters_path}") + + print(f" {clusters['cluster'].n_unique()} clusters") + + print("\nLoading crosswalk...") + zip_codes = clusters["zip_code"].unique().to_list() + + crosswalk = ( + pl.scan_csv(crosswalk_path, separator="\t", infer_schema_length=10000) + .with_columns([ + (pl.col("Zip").cast(pl.Utf8).str.zfill(5) + "-" + pl.col("Zip4").cast(pl.Utf8).str.zfill(4)).alias( + "zip_code" + ), + pl.col("CensusKey2023").cast(pl.Utf8).str.zfill(15).str.slice(0, 12).alias("block_group_geoid"), + ]) + .filter(pl.col("zip_code").is_in(zip_codes)) + .select(["zip_code", "block_group_geoid"]) + .collect() + ) + + print(f" {crosswalk['zip_code'].n_unique():,} ZIP+4s matched") + + # Check for fan-out + fanout = crosswalk.group_by("zip_code").agg(pl.n_unique("block_group_geoid").alias("n_bg")) + max_fanout = fanout["n_bg"].max() + if max_fanout > 1: + pct_fanout = (fanout.filter(pl.col("n_bg") > 1).height / len(fanout)) * 100 + print(f" ⚠️ {pct_fanout:.1f}% of ZIP+4s map to multiple block groups (max={max_fanout})") + + print("\nJoining to block groups...") + df = clusters.join(crosswalk, on="zip_code", how="left") + + missing = df.filter(pl.col("block_group_geoid").is_null()).height + if missing > 0: + print(f" ⚠️ {missing:,} observations ({missing / len(df) * 100:.1f}%) missing block group") + + df = df.filter(pl.col("block_group_geoid").is_not_null()) + print(f" ✓ {len(df):,} observations across {df['block_group_geoid'].n_unique():,} block groups") + + # Store the ID column name for later use + df = df.with_columns(pl.lit(id_col).alias("_id_col_name")) + + return df + + +def diagnose_household_density(df: pl.DataFrame) -> dict: + """Check households per block group.""" + print("\n" + "=" * 70) + print("DIAGNOSTIC 1: HOUSEHOLD DENSITY PER BLOCK GROUP") + print("=" * 70) + + # Get the ID column name (stored during join) + id_col = df["_id_col_name"][0] if "_id_col_name" in df.columns else None + + if not id_col or id_col not in df.columns: + print("\n⚠️ WARNING: Cannot compute household density - no household ID column") + print(" Skipping this diagnostic.") + return {} + + hh_per_bg = df.group_by("block_group_geoid").agg(pl.col(id_col).n_unique().alias("n_households")) + + stats = { + "n_block_groups": hh_per_bg.height, + "mean_hh_per_bg": hh_per_bg["n_households"].mean(), + "median_hh_per_bg": hh_per_bg["n_households"].median(), + "min_hh_per_bg": hh_per_bg["n_households"].min(), + "max_hh_per_bg": hh_per_bg["n_households"].max(), + "p10_hh_per_bg": hh_per_bg["n_households"].quantile(0.10), + "p90_hh_per_bg": hh_per_bg["n_households"].quantile(0.90), + } + + print(f"\nBlock groups: {stats['n_block_groups']:,}") + print("Households per block group:") + print(f" Mean: {stats['mean_hh_per_bg']:.1f}") + print(f" Median: {stats['median_hh_per_bg']:.1f}") + print(f" Min: {stats['min_hh_per_bg']}") + print(f" Max: {stats['max_hh_per_bg']}") + print(f" P10: {stats['p10_hh_per_bg']:.1f}") + print(f" P90: {stats['p90_hh_per_bg']:.1f}") + + # Distribution + print("\nDistribution:") + dist = ( + hh_per_bg.with_columns( + pl.when(pl.col("n_households") == 1) + .then(pl.lit("1 household")) + .when(pl.col("n_households") == 2) + .then(pl.lit("2 households")) + .when(pl.col("n_households").is_between(3, 5)) + .then(pl.lit("3-5 households")) + .when(pl.col("n_households").is_between(6, 10)) + .then(pl.lit("6-10 households")) + .when(pl.col("n_households").is_between(11, 20)) + .then(pl.lit("11-20 households")) + .otherwise(pl.lit("20+ households")) + .alias("bucket") + ) + .group_by("bucket") + .agg(pl.len().alias("n_bg")) + .sort("n_bg", descending=True) + ) + + for row in dist.iter_rows(named=True): + pct = row["n_bg"] / stats["n_block_groups"] * 100 + print(f" {row['bucket']:20s}: {row['n_bg']:5,} BGs ({pct:5.1f}%)") + + # Assessment + print("\nASSESSMENT:") + if stats["median_hh_per_bg"] < 3: + print(" ❌ CRITICAL: Median < 3 households per block group") + print(" → Most block groups have too few households for stable cluster shares") + print(" → Stage 2 regression will have very weak signal") + elif stats["median_hh_per_bg"] < 10: + print(" ⚠️ WARNING: Median < 10 households per block group") + print(" → Cluster shares will be noisy") + print(" → Consider increasing sample size or aggregating to ZIP-level") + else: + print(" ✓ GOOD: Median ≥ 10 households per block group") + print(" → Should have reasonable signal for Stage 2") + + return stats + + +def diagnose_obs_density(df: pl.DataFrame) -> dict: + """Check household-day observations per block group.""" + print("\n" + "=" * 70) + print("DIAGNOSTIC 2: OBSERVATION DENSITY PER BLOCK GROUP") + print("=" * 70) + + obs_per_bg = df.group_by("block_group_geoid").agg(pl.len().alias("n_obs")) + + stats = { + "mean_obs_per_bg": obs_per_bg["n_obs"].mean(), + "median_obs_per_bg": obs_per_bg["n_obs"].median(), + "min_obs_per_bg": obs_per_bg["n_obs"].min(), + "max_obs_per_bg": obs_per_bg["n_obs"].max(), + } + + print("\nObservations (household-days) per block group:") + print(f" Mean: {stats['mean_obs_per_bg']:.1f}") + print(f" Median: {stats['median_obs_per_bg']:.1f}") + print(f" Min: {stats['min_obs_per_bg']}") + print(f" Max: {stats['max_obs_per_bg']}") + + # After Stage 2 filtering (≥50 obs, ≥2 clusters) + filtered = obs_per_bg.filter(pl.col("n_obs") >= 50) + n_filtered = filtered.height + pct_surviving = n_filtered / obs_per_bg.height * 100 if obs_per_bg.height > 0 else 0 + + print("\nAfter Stage 2 filtering (≥50 obs per BG):") + print(f" {n_filtered:,} block groups ({pct_surviving:.1f}%) survive") + + if pct_surviving < 20: + print("\n ⚠️ WARNING: <20% of block groups survive filtering") + print(" → You're throwing away most of your data") + print(" → Consider lowering threshold or increasing sample size") + + return stats + + +def diagnose_cluster_share_variance(df: pl.DataFrame) -> dict: + """Check variance in cluster shares across block groups.""" + print("\n" + "=" * 70) + print("DIAGNOSTIC 3: CLUSTER SHARE VARIANCE") + print("=" * 70) + + # Compute cluster shares per block group + bg_counts = df.group_by(["block_group_geoid", "cluster"]).agg(pl.len().alias("n_obs")) + + bg_totals = df.group_by("block_group_geoid").agg(pl.len().alias("total_obs")) + + shares = bg_counts.join(bg_totals, on="block_group_geoid").with_columns( + (pl.col("n_obs") / pl.col("total_obs")).alias("cluster_share") + ) + + # Pivot to wide format for variance calculation + wide = shares.pivot( + index="block_group_geoid", + columns="cluster", + values="cluster_share", + ).fill_null(0) + + cluster_cols = [c for c in wide.columns if c != "block_group_geoid"] + + print("\nCluster share variance across block groups:") + stats = {} + for col in sorted(cluster_cols): + if col in wide.columns: + var = wide[col].var() + mean = wide[col].mean() + stats[col] = {"mean": mean, "variance": var} + print(f" Cluster {col}: mean={mean:.3f}, variance={var:.4f}") + + if stats: + avg_var = sum(s["variance"] for s in stats.values()) / len(stats) + + print(f"\nAverage variance: {avg_var:.4f}") + + print("\nASSESSMENT:") + if avg_var < 0.01: + print(" ❌ CRITICAL: Variance < 0.01") + print(" → Cluster shares barely differ across block groups") + print(" → No demographic signal can be detected") + print(" → MUST increase sample size or change aggregation level") + elif avg_var < 0.02: + print(" ⚠️ WARNING: Variance < 0.02") + print(" → Weak signal; demographic effects will be hard to detect") + else: + print(" ✓ GOOD: Variance ≥ 0.02") + print(" → Sufficient variation for regression") + + return stats + + +def generate_recommendations( + hh_stats: dict, + obs_stats: dict, + share_stats: dict, +) -> None: + """Generate actionable recommendations.""" + print("\n" + "=" * 70) + print("RECOMMENDATIONS") + print("=" * 70) + + if not hh_stats: + print("\n⚠️ Could not assess household density (no ID column)") + print(" Assess based on observation density instead.") + return + + median_hh = hh_stats.get("median_hh_per_bg", 0) + + if median_hh < 3: + print("\n🔴 CRITICAL ISSUE: Sample too sparse for block-group analysis") + print("\nOptions:") + print(" 1. Increase household sample to 50k-100k+") + print(" 2. Switch to ZIP-level or ZIP+4-level aggregation") + print(" 3. Use stratified sampling (population-weighted by block group)") + print(" 4. Aggregate to county-level if only interested in broad patterns") + + elif median_hh < 10: + print("\n⚠️ WARNING: Marginal sample density") + print("\nOptions:") + print(" 1. Increase sample to 30k-50k households") + print(" 2. Consider ZIP-level aggregation for more stable estimates") + print(" 3. Use hierarchical modeling to pool information across similar BGs") + + else: + print("\n✓ Sample density looks reasonable") + print("\nOptional improvements:") + print(" 1. Still consider stratified sampling for better coverage") + print(" 2. Add more days if household-day counts are low") + + print("\n" + "=" * 70) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Diagnose block-group sampling density for Stage 2") + parser.add_argument("--clusters", type=Path, required=True, help="Path to cluster_assignments.parquet") + parser.add_argument("--crosswalk", type=Path, required=True, help="Path to ZIP+4 crosswalk file") + parser.add_argument("--inspect-only", action="store_true", help="Only inspect the cluster file schema and exit") + parser.add_argument("--output", type=Path, default=None, help="Optional: save report to file") + + args = parser.parse_args() + + # Inspect the cluster file first + inspect_cluster_file(args.clusters) + + if args.inspect_only: + return + + # Load and join + df = load_and_join_to_blockgroups(args.clusters, args.crosswalk) + + # Run diagnostics + hh_stats = diagnose_household_density(df) + obs_stats = diagnose_obs_density(df) + share_stats = diagnose_cluster_share_variance(df) + + # Recommendations + generate_recommendations(hh_stats, obs_stats, share_stats) + + # TODO: Save to file if requested + if args.output: + print(f"\nReport saved to: {args.output}") + + +if __name__ == "__main__": + main()