From ba8632cbfae4a8eb8874c6c9aeb777ea346509ca Mon Sep 17 00:00:00 2001 From: Bhupendra Raut Date: Wed, 13 May 2026 18:22:57 -0500 Subject: [PATCH 1/6] ADD:(CI) windows; refactor README --- .github/workflows/ci.yml | 2 +- README.md | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 147e805..b1a50fa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,7 +25,7 @@ jobs: fail-fast: false matrix: python-version: ["3.12", "3.13"] - os: [ubuntu, macOS] + os: [ubuntu, macOS, windows] steps: - name: Checkout repository diff --git a/README.md b/README.md index 76603be..e931e81 100644 --- a/README.md +++ b/README.md @@ -4,17 +4,13 @@ [![CI](https://github.com/ARM-DOE/Adapt/actions/workflows/ci.yml/badge.svg)](https://github.com/ARM-DOE/Adapt/actions?query=workflow%3ACI) [![Codecov](https://img.shields.io/codecov/c/github/ARM-DOE/Adapt.svg?logo=codecov)](https://codecov.io/gh/ARM-DOE/Adapt) [![CodeFactor](https://www.codefactor.io/repository/github/arm-doe/adapt/badge)](https://www.codefactor.io/repository/github/arm-doe/adapt) +[![Security](https://github.com/ARM-DOE/Adapt/actions/workflows/security-analysis.yml/badge.svg)](https://arm-doe.github.io/Adapt/) +[![Virus](https://github.com/ARM-DOE/Adapt/actions/workflows/virus.yml/badge.svg)](https://arm-doe.github.io/Adapt/) [![Docs](https://github.com/ARM-DOE/Adapt/actions/workflows/docs.yml/badge.svg)](https://arm-doe.github.io/Adapt/) [![PyPi release](https://github.com/ARM-DOE/Adapt/actions/workflows/pypi-release.yml/badge.svg)](https://arm-doe.github.io/Adapt/) -[![PyPI - Version](https://img.shields.io/pypi/v/arm-adapt)](https://pypi.org/project/arm-adapt/) [![PyPI Downloads](https://static.pepy.tech/personalized-badge/arm-adapt?period=total&units=INTERNATIONAL_SYSTEM&left_color=BLACK&right_color=GREEN&left_text=downloads)](https://pypi.org/project/arm-adapt/) - -[![Security](https://github.com/ARM-DOE/Adapt/actions/workflows/security-analysis.yml/badge.svg)](https://arm-doe.github.io/Adapt/) -[![Virus](https://github.com/ARM-DOE/Adapt/actions/workflows/virus.yml/badge.svg)](https://arm-doe.github.io/Adapt/) - - [![PyPI - License](https://img.shields.io/pypi/l/arm-adapt)](https://github.com/ARM-DOE/Adapt?tab=License-1-ov-file#) [![ARM](https://img.shields.io/badge/Sponsor-ARM-blue.svg?colorA=00c1de&colorB=00539c)](https://www.arm.gov/) From 3bc6ca096c0ceb75daf83af8f41ebd541578b7de Mon Sep 17 00:00:00 2001 From: Bhupendra Raut Date: Thu, 14 May 2026 14:01:40 -0500 Subject: [PATCH 2/6] MOD: imports, pyproject.toml --- .github/workflows/ci.yml | 2 +- .importlinter | 47 ++++++++++++++++++++++++++++------------ pyproject.toml | 1 + 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b1a50fa..147e805 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,7 +25,7 @@ jobs: fail-fast: false matrix: python-version: ["3.12", "3.13"] - os: [ubuntu, macOS, windows] + os: [ubuntu, macOS] steps: - name: Checkout repository diff --git a/.importlinter b/.importlinter index 19fbba7..b54ed12 100644 --- a/.importlinter +++ b/.importlinter @@ -1,4 +1,4 @@ -# Import Linter configuration for ADAPT +# Import Linter configuration for Adapt # # Purpose: # Enforce architectural boundaries so the codebase stays modular, @@ -19,22 +19,14 @@ root_packages = include_external_packages = False # ========================================================== -# 1. Scientific modules must remain independent +# 1. Scientific module independence — enforced by pytest # ========================================================== # -# No module may import from any other module — directly or -# transitively. Shared types belong in adapt.contracts. +# Inter-module imports and upward imports from modules/ are +# caught by tests/test_architecture.py, which auto-discovers +# subpackages at runtime. No explicit module list needed there. +# Run: pytest tests/test_architecture.py -[importlinter:contract:independent_modules] -name = Adapt modules remain independent -type = independence -modules = - adapt.modules.acquisition - adapt.modules.analysis - adapt.modules.detection - adapt.modules.ingest - adapt.modules.projection - adapt.modules.tracking # ========================================================== # 2. Modules do not depend on runtime orchestration @@ -68,12 +60,39 @@ type = forbidden source_modules = adapt.modules forbidden_modules = + adapt.execution adapt.persistence adapt.api adapt.gui adapt.visualization adapt.configuration +# ========================================================== +# 3b. Utils package imports no adapt internals +# ========================================================== +# +# adapt.utils contains pure stdlib functions shared across +# the codebase. It must not depend on any adapt layer or +# it creates import cycles and loses its zero-dependency value. + +[importlinter:contract:utils_are_pure] +name = Utils package imports no adapt internals +type = forbidden +source_modules = + adapt.utils +forbidden_modules = + adapt.modules + adapt.contracts + adapt.execution + adapt.runtime + adapt.persistence + adapt.configuration + adapt.api + adapt.gui + adapt.visualization + adapt.cli + adapt.extensions + # ========================================================== # 4. Persistence is infrastructure — no science, no orchestration # ========================================================== diff --git a/pyproject.toml b/pyproject.toml index de09d81..96f91bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,7 @@ adapt = "adapt.cli:main" dev = [ "pytest>=7.0", "pytest-cov", + "import-linter>=2.0", ] docs = [ "sphinx>=7.0", From 110f8605df2bf2fa536c1092aa229aa2076208dd Mon Sep 17 00:00:00 2001 From: Bhupendra Raut Date: Thu, 14 May 2026 14:05:04 -0500 Subject: [PATCH 3/6] ENH:handled exceptions, add logger, no prints --- src/adapt/cli.py | 12 ++++++++---- src/adapt/gui/dashboard.py | 40 ++++++++++++++++++++++++-------------- 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/src/adapt/cli.py b/src/adapt/cli.py index 6216bd2..4bb43e5 100644 --- a/src/adapt/cli.py +++ b/src/adapt/cli.py @@ -19,7 +19,7 @@ """ import argparse -import contextlib +import logging import os import signal import sys @@ -33,6 +33,8 @@ # Single-instance enforcement # --------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + _PID_FILE = Path.home() / '.adapt' / 'pipeline.pid' @@ -58,8 +60,10 @@ def _write_pid() -> None: def _remove_pid() -> None: - with contextlib.suppress(Exception): + try: _PID_FILE.unlink(missing_ok=True) + except OSError as exc: + logger.warning("Could not remove PID file %s: %s", _PID_FILE, exc) # --------------------------------------------------------------------------- @@ -82,8 +86,8 @@ def _build_run_nexrad_parser(sub: argparse.ArgumentParser) -> None: help='Processing mode', ) sub.add_argument('--start-time', dest='start_time', help='Start time (ISO 8601)') - sub.add_argument('--end-time', dest='end_time', help='End time (ISO 8601)') - sub.add_argument('--base-dir', dest='base_dir', help='Repository output directory') + sub.add_argument('--end-time', dest='end_time', help='End time (ISO 8601)') + sub.add_argument('--base-dir', dest='base_dir', help='Repository output directory') sub.add_argument( '--run-id', dest='run_id', diff --git a/src/adapt/gui/dashboard.py b/src/adapt/gui/dashboard.py index b40c015..022d232 100644 --- a/src/adapt/gui/dashboard.py +++ b/src/adapt/gui/dashboard.py @@ -23,6 +23,7 @@ import contextlib import copy +import logging import os import shutil import subprocess @@ -32,6 +33,8 @@ from datetime import datetime from pathlib import Path +logger = logging.getLogger(__name__) + @contextlib.contextmanager def _suppress_osx_stderr(): @@ -60,7 +63,7 @@ def _suppress_osx_stderr(): os.environ['PROJ_DATA'] = _pd os.environ['PROJ_LIB'] = _pd except Exception: - pass + logger.exception("Failed to configure PROJ environment variables") # ── Tkinter ─────────────────────────────────────────────────────────────────── import tkinter as tk # noqa: E402 @@ -136,7 +139,7 @@ def _cell_uid_disp(uid) -> str: if _pd.isna(uid): return '\u2014' except Exception: - pass + logger.exception("Failed to normalize cell UID display value") if uid is None: return '\u2014' return str(uid)[:4] @@ -184,7 +187,7 @@ def set_message(self, s): s = (f'x={x_km:.2f} y={y_km:.2f}' f' {lat_v:.4f}\u00b0 {lon_v:.4f}\u00b0') except Exception: - pass + logger.exception("Failed to update toolbar coordinate message") super().set_message(s) else: _CompactToolbar = None @@ -276,6 +279,7 @@ def _pipeline_running() -> bool: os.kill(pid, 0) return True except Exception: + logger.exception("Failed to verify pipeline PID status") return False @@ -296,7 +300,7 @@ def _list_radars(repo: Path) -> list: if radars: return sorted(radars) except Exception: - pass + logger.exception("Failed to list radars via DataClient; using filesystem fallback") # Fallback: filesystem scan for NEXRAD-style directories return sorted( @@ -346,7 +350,7 @@ def _list_runs(repo: Path, radar: str = None) -> list: runs.append(f'{run_id} ({mtime})') return runs except Exception: - pass + logger.exception("Failed to list runs via DataClient; using filesystem fallback") # Fallback: filesystem scan for runtime_config_*.json configs = sorted(repo.glob('runtime_config_*.json'), reverse=True) @@ -723,7 +727,7 @@ def _on_repo_changed(self): if latest_run and latest_run.get('radar') in radars: latest_radar = latest_run['radar'] except Exception: - pass + logger.exception("Failed to auto-select radar from latest run") if latest_radar: self._radar.set(latest_radar) @@ -782,6 +786,7 @@ def _start(self): start_new_session=True, ) except Exception as e: + logger.exception("Failed to launch pipeline subprocess: %s", cmd) messagebox.showerror('Launch failed', str(e), parent=self) return @@ -917,11 +922,14 @@ def _refresh_all(self): else: self._clear_time_series() except Exception: + logger.exception( + "Failed to refresh selected cell time series; clearing view" + ) self._clear_time_series() else: self._clear_time_series() except Exception: - pass + logger.exception("Failed to auto-refresh current NC canvas") else: # Canvas was cleared externally; re-render try: @@ -930,7 +938,7 @@ def _refresh_all(self): self._last_rendered_nc = latest self.scan_var.set(labels[-1] if labels else '') except Exception: - pass + logger.exception("Failed to render latest NC file during auto-refresh") self._refresh_table() if self._nb.index('current') == 2: @@ -1083,7 +1091,7 @@ def _load_cells_data(self, repo, radar): return conn.close() except Exception: - pass + logger.exception("Failed to load cells from SQLite catalog") # Fallback: parquet (may not contain cell_uid) pqs = sorted((Path(repo) / radar / 'analysis').glob('analysis2d_*.parquet')) @@ -1092,7 +1100,7 @@ def _load_cells_data(self, repo, radar): dfs = [pd.read_parquet(p) for p in pqs] self._current_cell_df = pd.concat(dfs, ignore_index=True) except Exception: - pass + logger.exception("Failed to load fallback parquet cell data") # ── NC loop render (cycle through N frames) ─────────────────────────────── @@ -1359,7 +1367,7 @@ def _add_basemap(ax, ds, x_km, y_km): source=ctx.providers.OpenStreetMap.Mapnik, alpha=0.6, attribution=False, zoom=8, zorder=0) except Exception as e: - print(f'Basemap error: {e}') + logger.warning("Basemap unavailable: %s", e) # ── Single update entry point ───────────────────────────────────────────── @@ -1416,7 +1424,7 @@ def _on_cell_click(self, event) -> None: r = matched.iloc[0] cell_uid = r.get('cell_uid') except Exception: - pass + logger.exception("Failed to resolve cell UID from track store") # Fallback: search loaded cell df with 60-s time window if cell_uid is None: @@ -1449,7 +1457,7 @@ def _on_cell_click(self, event) -> None: ts_obj = TrackStore(db_path) history_df = ts_obj.get_track_history(self._current_run_id, str(cell_uid)) except Exception: - pass + logger.exception("Failed to load tracking history from track store") if history_df is None or history_df.empty: df = self._current_cell_df @@ -1763,7 +1771,7 @@ def _f(key, fmt='.1f', suffix=''): self._hv[k].set(_em) except Exception: - pass + logger.exception("Failed to update hover stats values") # ── Cell statistics ─────────────────────────────────────────────────────── @@ -1790,6 +1798,7 @@ def _refresh_table(self): if rows: df = pd.DataFrame([dict(r) for r in rows]) except Exception: + logger.exception("DB stats query failed; falling back to parquet") df = None # Fallback: parquet @@ -1802,6 +1811,7 @@ def _refresh_table(self): dfs = [pd.read_parquet(p) for p in pqs] df = pd.concat(dfs, ignore_index=True) except Exception as e: + logger.exception("Failed to load parquet files for stats table") self.stats_lbl.config(text=f'Error: {e}') return @@ -1813,7 +1823,7 @@ def _refresh_table(self): df['scan_time'] = pd.to_datetime(df['scan_time'], utc=True) df['time_label'] = df['scan_time'].dt.strftime('%H:%M:%S') except Exception: - pass + logger.exception("Failed to parse scan_time column for table display") # Update slider range bounds from data for col, (lo_v, hi_v) in self._flt.items(): From d5e8073f50eab1139d0fb3467e23f9c6c2a80296 Mon Sep 17 00:00:00 2001 From: Bhupendra Raut Date: Thu, 14 May 2026 14:09:18 -0500 Subject: [PATCH 4/6] REF:moved execution nodes from module files" execution node separation (modules cleaned up, nodes moved to execution/) --- src/adapt/configuration/defaults.yaml | 10 +- src/adapt/execution/nodes/__init__.py | 2 + src/adapt/execution/nodes/analysis.py | 55 +++++++++++ src/adapt/execution/nodes/detection.py | 53 ++++++++++ src/adapt/execution/nodes/ingest.py | 102 +++++++++++++++++++ src/adapt/execution/nodes/projection.py | 60 ++++++++++++ src/adapt/execution/nodes/tracking.py | 63 ++++++++++++ src/adapt/modules/analysis/module.py | 95 +----------------- src/adapt/modules/base.py | 42 ++++---- src/adapt/modules/detection/module.py | 63 ------------ src/adapt/modules/ingest/module.py | 108 --------------------- src/adapt/modules/projection/module.py | 66 ------------- src/adapt/modules/tracking/__init__.py | 5 - src/adapt/modules/tracking/module.py | 124 ++---------------------- 14 files changed, 368 insertions(+), 480 deletions(-) create mode 100644 src/adapt/execution/nodes/__init__.py create mode 100644 src/adapt/execution/nodes/analysis.py create mode 100644 src/adapt/execution/nodes/detection.py create mode 100644 src/adapt/execution/nodes/ingest.py create mode 100644 src/adapt/execution/nodes/projection.py create mode 100644 src/adapt/execution/nodes/tracking.py diff --git a/src/adapt/configuration/defaults.yaml b/src/adapt/configuration/defaults.yaml index 04c5218..5734d77 100644 --- a/src/adapt/configuration/defaults.yaml +++ b/src/adapt/configuration/defaults.yaml @@ -6,8 +6,8 @@ pipeline: modules: - - adapt.modules.ingest.module - - adapt.modules.detection.module - - adapt.modules.projection.module - - adapt.modules.analysis.module - - adapt.modules.tracking.module + - adapt.execution.nodes.ingest + - adapt.execution.nodes.detection + - adapt.execution.nodes.projection + - adapt.execution.nodes.analysis + - adapt.execution.nodes.tracking diff --git a/src/adapt/execution/nodes/__init__.py b/src/adapt/execution/nodes/__init__.py new file mode 100644 index 0000000..7a02e11 --- /dev/null +++ b/src/adapt/execution/nodes/__init__.py @@ -0,0 +1,2 @@ +# Copyright © 2026, UChicago Argonne, LLC +# See LICENSE for terms and disclaimer. diff --git a/src/adapt/execution/nodes/analysis.py b/src/adapt/execution/nodes/analysis.py new file mode 100644 index 0000000..9eb1df0 --- /dev/null +++ b/src/adapt/execution/nodes/analysis.py @@ -0,0 +1,55 @@ +# Copyright © 2026, UChicago Argonne, LLC +# See LICENSE for terms and disclaimer. + +from adapt.contracts import check_cell_adjacency, check_cell_stats +from adapt.execution.module_registry import registry +from adapt.modules.analysis.module import RadarCellAnalyzer +from adapt.modules.base import BaseModule + + +class AnalysisModule(BaseModule): + """BaseModule wrapper for RadarCellAnalyzer. + + Extracts per-cell statistics (area, reflectivity, motion, centroids) + from a segmented/projected 2D dataset. Pure compute — no I/O. + Persistence is the processor's responsibility. + + Context inputs + -------------- + projected_ds : xr.Dataset + 2D dataset with projections (output of ProjectionModule). + config : InternalConfig + Runtime configuration. + scan_time : datetime + Radar scan timestamp (from LoadModule). + + Context outputs + --------------- + cell_stats : pd.DataFrame + Per-cell statistics DataFrame. + cell_adjacency : pd.DataFrame + Touching-cell pairs DataFrame. + """ + + name = "analysis" + inputs = ["projected_ds", "analysis_config", "scan_time"] + outputs = ["cell_stats", "cell_adjacency"] + output_contracts = {"cell_stats": check_cell_stats, "cell_adjacency": check_cell_adjacency} + + def __init__(self) -> None: + self._analyzer = None + + def run(self, context: dict) -> dict: + config = context["analysis_config"] + ds_2d = context["projected_ds"] + + if self._analyzer is None: + self._analyzer = RadarCellAnalyzer(config) + + df_cells = self._analyzer.extract(ds_2d, z_level=config.z_level) + df_adjacency = self._analyzer.extract_adjacency(ds_2d) + + return {"cell_stats": df_cells, "cell_adjacency": df_adjacency} + + +registry.register(AnalysisModule) diff --git a/src/adapt/execution/nodes/detection.py b/src/adapt/execution/nodes/detection.py new file mode 100644 index 0000000..6aaa227 --- /dev/null +++ b/src/adapt/execution/nodes/detection.py @@ -0,0 +1,53 @@ +# Copyright © 2026, UChicago Argonne, LLC +# See LICENSE for terms and disclaimer. + +from adapt.contracts import check_grid_ds_2d, check_segmented_ds +from adapt.execution.module_registry import registry +from adapt.modules.base import BaseModule +from adapt.modules.detection.module import RadarCellSegmenter + + +class DetectModule(BaseModule): + """BaseModule wrapper for RadarCellSegmenter. + + Segments convective cells from a 2D reflectivity field using + threshold and morphological filtering. + + Context inputs + -------------- + grid_ds_2d : xr.Dataset + 2D Cartesian dataset (output of LoadModule). + config : InternalConfig + Runtime configuration. + + Context outputs + --------------- + segmented_ds : xr.Dataset + 2D dataset with cell_labels variable added. + num_cells : int + Number of detected cells. + """ + + name = "detection" + inputs = ["grid_ds_2d", "detection_config"] + outputs = ["segmented_ds", "num_cells"] + input_contracts = {"grid_ds_2d": check_grid_ds_2d} + output_contracts = {"segmented_ds": check_segmented_ds} + + def __init__(self) -> None: + self._segmenter = None + + def run(self, context: dict) -> dict: + config = context["detection_config"] + ds_2d = context["grid_ds_2d"] + + if self._segmenter is None: + self._segmenter = RadarCellSegmenter(config) + + segmented = self._segmenter.segment(ds_2d) + num_cells = int(segmented[config.labels_var].max().item()) + + return {"segmented_ds": segmented, "num_cells": num_cells} + + +registry.register(DetectModule) diff --git a/src/adapt/execution/nodes/ingest.py b/src/adapt/execution/nodes/ingest.py new file mode 100644 index 0000000..29d750a --- /dev/null +++ b/src/adapt/execution/nodes/ingest.py @@ -0,0 +1,102 @@ +# Copyright © 2026, UChicago Argonne, LLC +# See LICENSE for terms and disclaimer. + +from datetime import UTC +from datetime import datetime as _dt +from pathlib import Path + +import numpy as np +import xarray as _xr + +from adapt.contracts import check_grid_ds_2d +from adapt.execution.module_registry import registry +from adapt.modules.base import BaseModule +from adapt.modules.ingest.module import RadarDataLoader + + +class LoadModule(BaseModule): + """BaseModule wrapper for RadarDataLoader. + + Reads a NEXRAD Level-II file, regrids it to Cartesian coordinates, + and extracts a 2D horizontal slice at the configured z-level. + + Context inputs + -------------- + nexrad_file : str + Path to the NEXRAD Level-II file. + config : InternalConfig + Runtime configuration (lazy-initialises the loader on first call). + output_dirs : dict + Output directory mapping (used for saving intermediate NetCDF). + + Context outputs + --------------- + grid_ds : xr.Dataset + Full 3D Cartesian xarray Dataset. + grid_ds_2d : xr.Dataset + 2D slice at configured z-level. + scan_time : datetime + Radar volume scan time parsed from the filename. + """ + + name = "ingest" + inputs = ["nexrad_file", "ingest_config"] + outputs = ["grid_ds", "grid_ds_2d", "scan_time"] + output_contracts = {"grid_ds_2d": check_grid_ds_2d} + + def __init__(self) -> None: + self._loader = None + + def run(self, context: dict) -> dict: + config = context["ingest_config"] + filepath = context["nexrad_file"] + output_dirs = context.get("output_dirs", {}) + + if self._loader is None: + self._loader = RadarDataLoader(config) + + radar = config.radar + nc_filename = Path(filepath).stem + scan_time = _dt.now(UTC) + try: + parts = nc_filename.split("_") + dt_str = parts[0][-8:] + parts[1] + scan_time = _dt.strptime(dt_str, "%Y%m%d%H%M%S") + except Exception: + pass + + date_str = scan_time.strftime("%Y%m%d") + base = output_dirs.get("base") + nc_path = base / radar / "gridnc" / date_str / nc_filename if base else None + output_dir = str(nc_path.parent) if nc_path else None + + ds = self._loader.load_and_regrid( + filepath, + save_netcdf=config.save_netcdf, + output_dir=output_dir, + ) + + if ds is None: + raise RuntimeError(f"Ingest failed: load_and_regrid returned None for {filepath}") + + z_level = config.z_level + z_name = config.z_coord + time_name = config.time_coord + z_idx = int(np.argmin(np.abs(ds[z_name].values - z_level))) + + ds_2d = _xr.Dataset() + for var_name in ds.data_vars: + var = ds[var_name] + if time_name in var.dims and z_name in var.dims: + ds_2d[var_name] = var.isel({time_name: 0, z_name: z_idx}) + else: + ds_2d[var_name] = var + for coord in ds.coords: + if coord not in ds_2d.coords: + ds_2d = ds_2d.assign_coords({coord: ds[coord]}) + ds_2d.attrs.update(ds.attrs) + + return {"grid_ds": ds, "grid_ds_2d": ds_2d, "scan_time": scan_time} + + +registry.register(LoadModule) diff --git a/src/adapt/execution/nodes/projection.py b/src/adapt/execution/nodes/projection.py new file mode 100644 index 0000000..1b26e19 --- /dev/null +++ b/src/adapt/execution/nodes/projection.py @@ -0,0 +1,60 @@ +# Copyright © 2026, UChicago Argonne, LLC +# See LICENSE for terms and disclaimer. + +from adapt.contracts import check_projected_ds, check_segmented_ds +from adapt.execution.module_registry import registry +from adapt.modules.base import BaseModule +from adapt.modules.projection.module import RadarCellProjector + + +class ProjectionModule(BaseModule): + """BaseModule wrapper for RadarCellProjector. + + Computes optical flow between consecutive radar frames and projects + cell positions forward in time. Stateless: receives the frame pair + via the context key ``dataset_history`` (injected by the processor). + + Context inputs + -------------- + segmented_ds : xr.Dataset + 2D segmented dataset for the current frame (output of DetectModule). + dataset_history : list of (str, xr.Dataset) + Rolling history of (filepath, segmented_ds) tuples supplied by the + processor. Must contain exactly 2 entries before this module is called. + config : InternalConfig + Runtime configuration. + + Context outputs + --------------- + projected_ds : xr.Dataset + 2D dataset with heading_x, heading_y, and cell_projections added. + """ + + name = "projection" + inputs = ["segmented_ds", "dataset_history", "projection_config"] + outputs = ["projected_ds"] + input_contracts = {"segmented_ds": check_segmented_ds} + output_contracts = {"projected_ds": check_projected_ds} + + def __init__(self) -> None: + self._projector = None + + def run(self, context: dict) -> dict: + config = context["projection_config"] + dataset_history = context["dataset_history"] # list of (filepath, ds_2d) + + if self._projector is None: + self._projector = RadarCellProjector(config) + + if len(dataset_history) < 2: + raise ValueError( + f"ProjectionModule requires 2 frames in dataset_history, " + f"got {len(dataset_history)}. Processor must pair frames before calling." + ) + + ds_list = [ds for _, ds in dataset_history] + projected = self._projector.project(ds_list) + return {"projected_ds": projected} + + +registry.register(ProjectionModule) diff --git a/src/adapt/execution/nodes/tracking.py b/src/adapt/execution/nodes/tracking.py new file mode 100644 index 0000000..34cfc8e --- /dev/null +++ b/src/adapt/execution/nodes/tracking.py @@ -0,0 +1,63 @@ +# Copyright © 2026, UChicago Argonne, LLC +# See LICENSE for terms and disclaimer. + +from adapt.contracts import check_cell_events, check_projected_ds, check_tracked_cells +from adapt.execution.module_registry import registry +from adapt.modules.base import BaseModule +from adapt.modules.tracking.module import RadarCellTracker + + +class TrackingModule(BaseModule): + """Assign stable `cell_uid` identities to convective cells across consecutive radar scans. + + Produces scan-local tracking outputs. Any higher-level grouping/aggregation + is outside this module's scope. + + Context inputs + -------------- + projected_ds : xr.Dataset + 2D dataset with projections (output of ProjectionModule). + cell_stats : pd.DataFrame + Per-cell statistics (output of AnalysisModule). + tracking_config : TrackingModuleConfig + Runtime configuration for the tracker. + scan_time : datetime + Radar scan timestamp. + + Context outputs + --------------- + tracked_cells : pd.DataFrame + Per-cell observations for the current scan with cell_uid/cell_label. + cell_events : pd.DataFrame + Explicit event rows for CONTINUE, SPLIT, MERGE, INITIATION, TERMINATION. + """ + + name = "tracking" + inputs = ["projected_ds", "cell_stats", "tracking_config", "scan_time"] + outputs = ["tracked_cells", "cell_events"] + input_contracts = {"projected_ds": check_projected_ds} + output_contracts = {"tracked_cells": check_tracked_cells, "cell_events": check_cell_events} + + def __init__(self) -> None: + self._tracker = None + + def run(self, context: dict) -> dict: + config = context["tracking_config"] + ds_2d = context["projected_ds"] + cell_stats = context["cell_stats"] + + if self._tracker is None: + self._tracker = RadarCellTracker(config) + + tracked_cells, cell_events = self._tracker.track( + ds_projected=ds_2d, + cell_stats_df=cell_stats, + ) + + return { + "tracked_cells": tracked_cells, + "cell_events": cell_events, + } + + +registry.register(TrackingModule) diff --git a/src/adapt/modules/analysis/module.py b/src/adapt/modules/analysis/module.py index 55a51f6..9d59583 100644 --- a/src/adapt/modules/analysis/module.py +++ b/src/adapt/modules/analysis/module.py @@ -22,10 +22,8 @@ Author: Bhupendra Raut """ -import contextlib import json import logging -from datetime import UTC import numpy as np import pandas as pd @@ -33,6 +31,8 @@ from scipy.ndimage import center_of_mass from skimage.measure import regionprops +from adapt.utils.time import normalize_time_scalar + __all__ = ['RadarCellAnalyzer'] logger = logging.getLogger(__name__) @@ -356,30 +356,6 @@ def _pixel_area_km2(self, ds): dy = float(np.abs(ds.y[1] - ds.y[0])) return (dx * dy) / 1e6 - @staticmethod - def _normalize_time_scalar(time_val): - tv = time_val - while isinstance(tv, np.ndarray) and tv.size == 1: - tv = tv.reshape(-1)[0] - if isinstance(tv, np.ndarray): - tv = tv.reshape(-1)[0] - if hasattr(tv, "item"): - with contextlib.suppress(Exception): - tv = tv.item() - if getattr(type(tv), "__module__", "").startswith("cftime"): - from datetime import datetime - tv = datetime( - int(tv.year), - int(tv.month), - int(tv.day), - int(tv.hour), - int(tv.minute), - int(tv.second), - int(getattr(tv, "microsecond", 0) or 0), - tzinfo=UTC, - ) - return tv - def _get_lat_lon_grids(self, ds): """Get lat/lon grids from dataset. @@ -475,7 +451,7 @@ def _extract_region_props(self, region, label_array, refl, lat_grid, lon_grid, # Get scan start time scan_time = "" if "time" in ds.coords: - tv = self._normalize_time_scalar(ds.time.values) + tv = normalize_time_scalar(ds.time.values) scan_time = pd.Timestamp(tv).isoformat() # === GEOMETRIC CENTROID (center of mass of binary mask) === @@ -655,68 +631,3 @@ def get_lat_lon(ix, iy, lat_grid, lon_grid): return np.nan, np.nan return float(lat), float(lon) - - - -# --------------------------------------------------------------------------- -# BaseModule wrapper — Step 6 -# --------------------------------------------------------------------------- - -from adapt.contracts import assert_analysis_output, assert_cell_adjacency # noqa: E402 -from adapt.execution.module_registry import registry # noqa: E402 -from adapt.modules.base import BaseModule # noqa: E402 - - -def _check_cell_stats(df): - assert_analysis_output(df) - -def _check_cell_adjacency(df): - assert_cell_adjacency(df) - - -class AnalysisModule(BaseModule): - """BaseModule wrapper for RadarCellAnalyzer. - - Extracts per-cell statistics (area, reflectivity, motion, centroids) - from a segmented/projected 2D dataset. Pure compute — no I/O. - Persistence is the processor's responsibility. - - Context inputs - -------------- - projected_ds : xr.Dataset - 2D dataset with projections (output of ProjectionModule). - config : InternalConfig - Runtime configuration. - scan_time : datetime - Radar scan timestamp (from LoadModule). - - Context outputs - --------------- - cell_stats : pd.DataFrame - Per-cell statistics DataFrame. - cell_adjacency : pd.DataFrame - Touching-cell pairs DataFrame. - """ - - name = "analysis" - inputs = ["projected_ds", "analysis_config", "scan_time"] - outputs = ["cell_stats", "cell_adjacency"] - output_contracts = {"cell_stats": _check_cell_stats, "cell_adjacency": _check_cell_adjacency} - - def __init__(self) -> None: - self._analyzer = None - - def run(self, context: dict) -> dict: - config = context["analysis_config"] - ds_2d = context["projected_ds"] - - if self._analyzer is None: - self._analyzer = RadarCellAnalyzer(config) - - df_cells = self._analyzer.extract(ds_2d, z_level=config.z_level) - df_adjacency = self._analyzer.extract_adjacency(ds_2d) - - return {"cell_stats": df_cells, "cell_adjacency": df_adjacency} - - -registry.register(AnalysisModule) diff --git a/src/adapt/modules/base.py b/src/adapt/modules/base.py index 39752bb..8e0ac18 100644 --- a/src/adapt/modules/base.py +++ b/src/adapt/modules/base.py @@ -1,50 +1,44 @@ # Copyright © 2026, UChicago Argonne, LLC # See LICENSE for terms and disclaimer. -"""Base interface for all Adapt processing modules. +"""Base interface for all Adapt execution nodes. -Every module in the system — whether in modules/ or extensions/ — must -declare its name, inputs, and outputs. The graph engine uses these -declarations to build the execution DAG automatically. +Every node in the system — whether in execution/nodes/ or extensions/ — must +declare its name, inputs, outputs, and optionally input/output contracts. +The graph engine uses these declarations to build the execution DAG automatically. -Existing scientific classes (AwsNexradDownloader, RadarCellSegmenter, etc.) -are NOT required to inherit BaseModule in Step 1. They are wrapped in -Step 6 of the refactor. BaseModule is the target interface definition. +Contract functions come from adapt.contracts — import them there, not here. """ from abc import ABC, abstractmethod from typing import ClassVar -from adapt.contracts import ContractViolation, require # noqa: F401 — re-exported for callers - -# ──────────────────────────────────────────────────────────────────────────── -# BaseModule Interface -# ──────────────────────────────────────────────────────────────────────────── - class BaseModule(ABC): - """Abstract base for all Adapt processing modules. + """Abstract base for all Adapt execution nodes. Subclasses declare: - ``name``: unique identifier used in the execution graph - - ``inputs``: list of data keys this module reads from context - - ``outputs``: list of data keys this module writes to context - - ``input_contracts``: optional {key: callable} validators run before run() - - ``output_contracts``: optional {key: callable} validators run after run() + - ``inputs``: list of context keys this node reads + - ``outputs``: list of context keys this node writes + - ``input_contracts``: optional {key: check_fn} validated before run() + - ``output_contracts``: optional {key: check_fn} validated after run() - The graph engine matches ``outputs`` of upstream modules to ``inputs`` - of downstream modules to resolve execution order automatically. - Contract callables are invoked by GraphExecutor automatically — modules - do not need to call them manually. + The graph engine matches ``outputs`` of upstream nodes to ``inputs`` + of downstream nodes to resolve execution order automatically. + Contract callables (from adapt.contracts) are invoked by GraphExecutor + automatically — nodes do not call them manually. Example:: + from adapt.contracts import check_grid_ds_2d, check_segmented_ds + class DetectModule(BaseModule): name = "detection" inputs = ["grid_ds_2d"] outputs = ["segmented_ds"] - input_contracts = {"grid_ds_2d": assert_gridded} - output_contracts = {"segmented_ds": assert_segmented} + input_contracts = {"grid_ds_2d": check_grid_ds_2d} + output_contracts = {"segmented_ds": check_segmented_ds} def run(self, context): grid = context["grid_ds_2d"] diff --git a/src/adapt/modules/detection/module.py b/src/adapt/modules/detection/module.py index c89fc14..c985636 100644 --- a/src/adapt/modules/detection/module.py +++ b/src/adapt/modules/detection/module.py @@ -357,66 +357,3 @@ def _relabel_by_size( old_to_new[labels_sorted] = np.arange(1, len(labels_sorted) + 1) return old_to_new[labels] - - -# --------------------------------------------------------------------------- -# BaseModule wrapper — Step 6 -# --------------------------------------------------------------------------- - -from adapt.contracts import assert_gridded, assert_segmented # noqa: E402 -from adapt.execution.module_registry import registry # noqa: E402 -from adapt.modules.base import BaseModule # noqa: E402 - - -def _check_grid_ds_2d(ds): - assert_gridded(ds, "reflectivity") - - -def _check_segmented_ds(ds): - assert_segmented(ds, "cell_labels") - - -class DetectModule(BaseModule): - """BaseModule wrapper for RadarCellSegmenter. - - Segments convective cells from a 2D reflectivity field using - threshold and morphological filtering. - - Context inputs - -------------- - grid_ds_2d : xr.Dataset - 2D Cartesian dataset (output of LoadModule). - config : InternalConfig - Runtime configuration. - - Context outputs - --------------- - segmented_ds : xr.Dataset - 2D dataset with cell_labels variable added. - num_cells : int - Number of detected cells. - """ - - name = "detection" - inputs = ["grid_ds_2d", "detection_config"] - outputs = ["segmented_ds", "num_cells"] - input_contracts = {"grid_ds_2d": _check_grid_ds_2d} - output_contracts = {"segmented_ds": _check_segmented_ds} - - def __init__(self) -> None: - self._segmenter = None - - def run(self, context: dict) -> dict: - config = context["detection_config"] - ds_2d = context["grid_ds_2d"] - - if self._segmenter is None: - self._segmenter = RadarCellSegmenter(config) - - segmented = self._segmenter.segment(ds_2d) - num_cells = int(segmented[config.labels_var].max().item()) - - return {"segmented_ds": segmented, "num_cells": num_cells} - - -registry.register(DetectModule) diff --git a/src/adapt/modules/ingest/module.py b/src/adapt/modules/ingest/module.py index 832f228..ff8c57d 100644 --- a/src/adapt/modules/ingest/module.py +++ b/src/adapt/modules/ingest/module.py @@ -337,111 +337,3 @@ def load_and_regrid(self, filepath: Path | str, grid_kwargs: dict = None, if __name__ == "__main__": print("RadarDataLoader loaded. Use: loader = RadarDataLoader(config)") - - - -# --------------------------------------------------------------------------- -# BaseModule wrapper — Step 6 -# --------------------------------------------------------------------------- - -from datetime import UTC # noqa: E402 -from datetime import datetime as _dt # noqa: E402 - -import numpy as np # noqa: E402 -import xarray as _xr # noqa: E402 - -from adapt.contracts import assert_gridded # noqa: E402 -from adapt.execution.module_registry import registry # noqa: E402 -from adapt.modules.base import BaseModule # noqa: E402 - - -def _check_grid_ds_2d(ds): - assert_gridded(ds, "reflectivity") - - -class LoadModule(BaseModule): - """BaseModule wrapper for RadarDataLoader. - - Reads a NEXRAD Level-II file, regrids it to Cartesian coordinates, - and extracts a 2D horizontal slice at the configured z-level. - - Context inputs - -------------- - nexrad_file : str - Path to the NEXRAD Level-II file. - config : InternalConfig - Runtime configuration (lazy-initialises the loader on first call). - output_dirs : dict - Output directory mapping (used for saving intermediate NetCDF). - - Context outputs - --------------- - grid_ds : xr.Dataset - Full 3D Cartesian xarray Dataset. - grid_ds_2d : xr.Dataset - 2D slice at configured z-level. - scan_time : datetime - Radar volume scan time parsed from the filename. - """ - - name = "ingest" - inputs = ["nexrad_file", "ingest_config"] - outputs = ["grid_ds", "grid_ds_2d", "scan_time"] - output_contracts = {"grid_ds_2d": _check_grid_ds_2d} - - def __init__(self) -> None: - self._loader = None - - def run(self, context: dict) -> dict: - config = context["ingest_config"] - filepath = context["nexrad_file"] - output_dirs = context.get("output_dirs", {}) - - if self._loader is None: - self._loader = RadarDataLoader(config) - - radar = config.radar - nc_filename = Path(filepath).stem - scan_time = _dt.now(UTC) - try: - parts = nc_filename.split("_") - dt_str = parts[0][-8:] + parts[1] - scan_time = _dt.strptime(dt_str, "%Y%m%d%H%M%S") - except Exception: - pass - - date_str = scan_time.strftime("%Y%m%d") - base = output_dirs.get("base") - nc_path = base / radar / "gridnc" / date_str / nc_filename if base else None - output_dir = str(nc_path.parent) if nc_path else None - - ds = self._loader.load_and_regrid( - filepath, - save_netcdf=config.save_netcdf, - output_dir=output_dir, - ) - - if ds is None: - raise RuntimeError(f"Ingest failed: load_and_regrid returned None for {filepath}") - - z_level = config.z_level - z_name = config.z_coord - time_name = config.time_coord - z_idx = int(np.argmin(np.abs(ds[z_name].values - z_level))) - - ds_2d = _xr.Dataset() - for var_name in ds.data_vars: - var = ds[var_name] - if time_name in var.dims and z_name in var.dims: - ds_2d[var_name] = var.isel({time_name: 0, z_name: z_idx}) - else: - ds_2d[var_name] = var - for coord in ds.coords: - if coord not in ds_2d.coords: - ds_2d = ds_2d.assign_coords({coord: ds[coord]}) - ds_2d.attrs.update(ds.attrs) - - return {"grid_ds": ds, "grid_ds_2d": ds_2d, "scan_time": scan_time} - - -registry.register(LoadModule) diff --git a/src/adapt/modules/projection/module.py b/src/adapt/modules/projection/module.py index 4ddfadf..e8f7709 100644 --- a/src/adapt/modules/projection/module.py +++ b/src/adapt/modules/projection/module.py @@ -557,69 +557,3 @@ def _fill_concave_hull(self, label_mask, alpha=0.1): logger.warning(f"Concave hull failed: {e}, falling back to dilation") kernel = np.ones((3, 3), dtype=np.uint8) return binary_dilation(label_mask, structure=kernel).astype(np.uint8) - - - -# --------------------------------------------------------------------------- -# BaseModule wrapper — Step 6 -# --------------------------------------------------------------------------- - -from adapt.contracts import assert_segmented # noqa: E402 -from adapt.execution.module_registry import registry # noqa: E402 -from adapt.modules.base import BaseModule # noqa: E402 - - -def _check_segmented_ds(ds): - assert_segmented(ds, "cell_labels") - - -class ProjectionModule(BaseModule): - """BaseModule wrapper for RadarCellProjector. - - Computes optical flow between consecutive radar frames and projects - cell positions forward in time. Stateless: receives the frame pair - via the context key ``dataset_history`` (injected by the processor). - - Context inputs - -------------- - segmented_ds : xr.Dataset - 2D segmented dataset for the current frame (output of DetectModule). - dataset_history : list of (str, xr.Dataset) - Rolling history of (filepath, segmented_ds) tuples supplied by the - processor. Must contain exactly 2 entries before this module is called. - config : InternalConfig - Runtime configuration. - - Context outputs - --------------- - projected_ds : xr.Dataset - 2D dataset with heading_x, heading_y, and cell_projections added. - """ - - name = "projection" - inputs = ["segmented_ds", "dataset_history", "projection_config"] - outputs = ["projected_ds"] - input_contracts = {"segmented_ds": _check_segmented_ds} - - def __init__(self) -> None: - self._projector = None - - def run(self, context: dict) -> dict: - config = context["projection_config"] - dataset_history = context["dataset_history"] # list of (filepath, ds_2d) - - if self._projector is None: - self._projector = RadarCellProjector(config) - - if len(dataset_history) < 2: - raise ValueError( - f"ProjectionModule requires 2 frames in dataset_history, " - f"got {len(dataset_history)}. Processor must pair frames before calling." - ) - - ds_list = [ds for _, ds in dataset_history] - projected = self._projector.project(ds_list) - return {"projected_ds": projected} - - -registry.register(ProjectionModule) diff --git a/src/adapt/modules/tracking/__init__.py b/src/adapt/modules/tracking/__init__.py index c7a282f..5a04538 100644 --- a/src/adapt/modules/tracking/__init__.py +++ b/src/adapt/modules/tracking/__init__.py @@ -1,8 +1,3 @@ # Copyright © 2026, UChicago Argonne, LLC # See LICENSE for terms and disclaimer. -"""Storm cell tracking module.""" - -from adapt.modules.tracking.module import TrackingModule - -__all__ = ["TrackingModule"] diff --git a/src/adapt/modules/tracking/module.py b/src/adapt/modules/tracking/module.py index 4338ed8..f3e3967 100644 --- a/src/adapt/modules/tracking/module.py +++ b/src/adapt/modules/tracking/module.py @@ -33,11 +33,9 @@ Journal of Applied Meteorology and Climatology, 60(4), 513-526. """ -import contextlib import hashlib import logging import string -from datetime import UTC import networkx as nx import numpy as np @@ -45,7 +43,9 @@ import xarray as xr from scipy.optimize import linear_sum_assignment -__all__ = ['RadarCellTracker', 'TrackingModule'] +from adapt.utils.time import normalize_time_scalar + +__all__ = ['RadarCellTracker'] logger = logging.getLogger(__name__) @@ -428,7 +428,7 @@ def get_cell_identity(self, track_index: int) -> tuple[str, str]: # ------------------------------------------------------------------ def _get_time(self, ds: xr.Dataset): - return self._normalize_time_scalar(ds.time.values) + return normalize_time_scalar(ds.time.values) @staticmethod def _to_epoch_seconds(time_val) -> float: @@ -437,46 +437,10 @@ def _to_epoch_seconds(time_val) -> float: ts = ts.tz_localize("UTC") return float(ts.timestamp()) - @staticmethod - def _normalize_time_scalar(time_val): - """Normalize xarray/cftime/numpy time representations to a scalar. - - Returns a scalar compatible with pandas.Timestamp: - - np.datetime64 - - datetime.datetime - - or a scalar string / timestamp-like object - """ - tv = time_val - while isinstance(tv, np.ndarray) and tv.size == 1: - tv = tv.reshape(-1)[0] - if isinstance(tv, np.ndarray): - tv = tv.reshape(-1)[0] - - if hasattr(tv, "item"): - with contextlib.suppress(Exception): - tv = tv.item() - - # Handle cftime.* objects (pandas cannot convert them directly) - if getattr(type(tv), "__module__", "").startswith("cftime"): - from datetime import datetime - - tv = datetime( - int(tv.year), - int(tv.month), - int(tv.day), - int(tv.hour), - int(tv.minute), - int(tv.second), - int(getattr(tv, "microsecond", 0) or 0), - tzinfo=UTC, - ) - - return tv - @staticmethod def _time_key(time_val) -> str: """Stable ISO8601 time key for event grouping.""" - tv = RadarCellTracker._normalize_time_scalar(time_val) + tv = normalize_time_scalar(time_val) return pd.Timestamp(tv).isoformat() def _extract_cells_from_analyzer( @@ -778,7 +742,7 @@ def _build_tracked_cells_current(self, time, node_ids: list[int]) -> pd.DataFram rows: list[dict] = [] for node_id in node_ids: node = self.graph.graph.nodes[node_id] - time_val = RadarCellTracker._normalize_time_scalar(node["time"]) + time_val = normalize_time_scalar(node["time"]) time_val = pd.Timestamp(time_val).to_datetime64() cell_uid = str(node["cell_uid"]) rows.append( @@ -821,7 +785,7 @@ def _build_cell_events_dataframe(events: list[dict]) -> pd.DataFrame: df[col] = None df = df[cols] df["time"] = df["time"].apply( - lambda t: pd.Timestamp(RadarCellTracker._normalize_time_scalar(t)) + lambda t: pd.Timestamp(normalize_time_scalar(t)) ) return df @@ -924,77 +888,3 @@ def _event_termination(self, time, source_node_id: int, target_node_id: int | No } -# ============================================================================= -# BaseModule wrapper (Phase 6 implementation placeholder) -# ============================================================================= - -from adapt.contracts import ( # noqa: E402 - assert_cell_events, - assert_projected, - assert_tracked_cells, -) -from adapt.execution.module_registry import registry # noqa: E402 -from adapt.modules.base import BaseModule # noqa: E402 - - -def _check_projected_ds(ds: xr.Dataset) -> None: - assert_projected(ds) - - -def _check_tracked_cells(df: pd.DataFrame) -> None: - if not df.empty: - assert_tracked_cells(df) - - -def _check_cell_events(df: pd.DataFrame) -> None: - if not df.empty: - assert_cell_events(df) - - -class TrackingModule(BaseModule): - """Assign stable `cell_uid` identities to convective cells across consecutive radar scans. - - Produces scan-local tracking outputs. Any higher-level grouping/aggregation - is outside this module's scope. - - Context outputs - --------------- - tracked_cells : pd.DataFrame - Per-cell observations for the current scan with cell_uid/cell_label. - cell_events : pd.DataFrame - Explicit event rows for CONTINUE, SPLIT, MERGE, INITIATION, TERMINATION. - """ - - name = "tracking" - inputs = ["projected_ds", "cell_stats", "tracking_config", "scan_time"] - outputs = ["tracked_cells", "cell_events"] - input_contracts = {"projected_ds": _check_projected_ds} - output_contracts = { - "tracked_cells": _check_tracked_cells, - "cell_events": _check_cell_events, - } - - def __init__(self) -> None: - self._tracker = None - - def run(self, context: dict) -> dict: - config = context["tracking_config"] - ds_2d = context["projected_ds"] - cell_stats = context["cell_stats"] - - if self._tracker is None: - self._tracker = RadarCellTracker(config) - - tracked_cells, cell_events = self._tracker.track( - ds_projected=ds_2d, - cell_stats_df=cell_stats, - ) - - return { - "tracked_cells": tracked_cells, - "cell_events": cell_events, - } - - -# Register module -registry.register(TrackingModule) From bbea4bbc4ecd6fbd67d3f3cdc825e184a6351b0e Mon Sep 17 00:00:00 2001 From: Bhupendra Raut Date: Thu, 14 May 2026 14:11:07 -0500 Subject: [PATCH 5/6] ADD:check bound contract wrappers & time contract MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit contracts — add check_* bound wrappers and time contract --- src/adapt/contracts/__init__.py | 39 ++++++++++++++++--- src/adapt/contracts/analysis.py | 10 +++++ src/adapt/contracts/grid.py | 5 +++ src/adapt/contracts/projection.py | 5 +++ src/adapt/contracts/segmentation.py | 5 +++ src/adapt/contracts/time.py | 59 +++++++++++++++++++++++++++++ src/adapt/contracts/tracking.py | 12 ++++++ 7 files changed, 130 insertions(+), 5 deletions(-) create mode 100644 src/adapt/contracts/time.py diff --git a/src/adapt/contracts/__init__.py b/src/adapt/contracts/__init__.py index 04aa1ba..5c5cd6a 100644 --- a/src/adapt/contracts/__init__.py +++ b/src/adapt/contracts/__init__.py @@ -5,16 +5,35 @@ All pipeline stage validators and the ContractViolation exception live here. Import from this package — never from individual contract submodules. + +Naming convention +----------------- +assert_* : primitive validators that may take extra arguments (e.g. variable + names). Call these from other validators or tests. +check_* : bound, zero-extra-arg wrappers. Register these directly in a + module's ``input_contracts`` or ``output_contracts`` dict. """ -from adapt.contracts.analysis import assert_analysis_output, assert_cell_adjacency -from adapt.contracts.grid import assert_gridded +from adapt.contracts.analysis import ( + assert_analysis_output, + assert_cell_adjacency, + check_cell_adjacency, + check_cell_stats, +) +from adapt.contracts.grid import assert_gridded, check_grid_ds_2d from adapt.contracts.pipeline import ContractViolation, require -from adapt.contracts.projection import assert_projected -from adapt.contracts.segmentation import assert_segmented -from adapt.contracts.tracking import assert_cell_events, assert_tracked_cells +from adapt.contracts.projection import assert_projected, check_projected_ds +from adapt.contracts.segmentation import assert_segmented, check_segmented_ds +from adapt.contracts.time import assert_time_normalized, check_time_normalized +from adapt.contracts.tracking import ( + assert_cell_events, + assert_tracked_cells, + check_cell_events, + check_tracked_cells, +) __all__ = [ + # primitives "ContractViolation", "require", "assert_gridded", @@ -24,4 +43,14 @@ "assert_cell_adjacency", "assert_tracked_cells", "assert_cell_events", + "assert_time_normalized", + # bound checks — register these in input_contracts / output_contracts + "check_grid_ds_2d", + "check_segmented_ds", + "check_projected_ds", + "check_cell_stats", + "check_cell_adjacency", + "check_tracked_cells", + "check_cell_events", + "check_time_normalized", ] diff --git a/src/adapt/contracts/analysis.py b/src/adapt/contracts/analysis.py index e50c1f2..16e74d1 100644 --- a/src/adapt/contracts/analysis.py +++ b/src/adapt/contracts/analysis.py @@ -94,3 +94,13 @@ def assert_cell_adjacency(df: pd.DataFrame) -> None: (df["touching_boundary_pixels"] >= 1).all(), "Cell adjacency contract violated: touching_boundary_pixels must be >= 1", ) + + +def check_cell_stats(df: pd.DataFrame) -> None: + """Bound contract for cell statistics DataFrame.""" + assert_analysis_output(df) + + +def check_cell_adjacency(df: pd.DataFrame) -> None: + """Bound contract for cell adjacency DataFrame.""" + assert_cell_adjacency(df) diff --git a/src/adapt/contracts/grid.py b/src/adapt/contracts/grid.py index 39ab496..ac347b4 100644 --- a/src/adapt/contracts/grid.py +++ b/src/adapt/contracts/grid.py @@ -38,3 +38,8 @@ def assert_gridded(ds: xr.Dataset, reflectivity_var: str) -> None: refl.ndim == 2, f"Grid contract violated: '{reflectivity_var}' has {refl.ndim} dims, expected 2", ) + + +def check_grid_ds_2d(ds: xr.Dataset) -> None: + """Bound contract for the standard 2D grid output (reflectivity variable name fixed).""" + assert_gridded(ds, "reflectivity") diff --git a/src/adapt/contracts/projection.py b/src/adapt/contracts/projection.py index 625d591..61026ff 100644 --- a/src/adapt/contracts/projection.py +++ b/src/adapt/contracts/projection.py @@ -46,3 +46,8 @@ def assert_projected(ds: xr.Dataset, max_steps: int = 5) -> None: f"Projection contract violated: found {num_steps} steps, expected {expected_steps} " f"(1 registration + {max_steps_actual} projections from config)", ) + + +def check_projected_ds(ds: xr.Dataset) -> None: + """Bound contract for the standard projected dataset.""" + assert_projected(ds) diff --git a/src/adapt/contracts/segmentation.py b/src/adapt/contracts/segmentation.py index f09d560..961f917 100644 --- a/src/adapt/contracts/segmentation.py +++ b/src/adapt/contracts/segmentation.py @@ -48,3 +48,8 @@ def assert_segmented(ds: xr.Dataset, labels_name: str) -> None: labels.ndim == 2, f"Segmentation contract violated: '{labels_name}' has {labels.ndim} dims, expected 2", ) + + +def check_segmented_ds(ds: xr.Dataset) -> None: + """Bound contract for the standard segmented dataset (cell_labels variable name fixed).""" + assert_segmented(ds, "cell_labels") diff --git a/src/adapt/contracts/time.py b/src/adapt/contracts/time.py new file mode 100644 index 0000000..e5aae92 --- /dev/null +++ b/src/adapt/contracts/time.py @@ -0,0 +1,59 @@ +# Copyright © 2026, UChicago Argonne, LLC +# See LICENSE for terms and disclaimer. + +"""Time coordinate contract. + +Enforces that datasets crossing module boundaries carry a normalized time +coordinate — numpy datetime64, not cftime — so no module needs its own +conversion logic. Modules that read raw radar data (e.g. ingest) are +responsible for calling adapt.utils.time.normalize_time_scalar before +returning a dataset to the context. +""" + +import numpy as np +import xarray as xr + +from adapt.contracts.pipeline import require + + +def assert_time_normalized(ds: xr.Dataset) -> None: + """Enforce that the dataset time coordinate is numpy-compatible, not cftime. + + Parameters + ---------- + ds : xr.Dataset + Dataset crossing a module boundary. + + Raises + ------ + ContractViolation + If the dataset has no time coordinate, or if the time coordinate + uses cftime objects instead of numpy datetime64. + """ + require( + "time" in ds.coords or hasattr(ds, "attrs") and "time" in ds.attrs, + "Time contract violated: dataset has no 'time' coordinate or attribute. " + "Every dataset crossing a module boundary must carry a time stamp.", + ) + + if "time" in ds.coords: + raw = ds.coords["time"].values + tv = raw.flat[0] if isinstance(raw, np.ndarray) and raw.ndim > 0 else raw + # unwrap numpy scalar wrapper if needed + if hasattr(tv, "item"): + try: + tv = tv.item() + except Exception: + pass + module = getattr(type(tv), "__module__", "") + require( + not module.startswith("cftime"), + f"Time contract violated: time coordinate is cftime ({type(tv).__name__}). " + "Normalize with adapt.utils.time.normalize_time_scalar before returning " + "the dataset from the ingest or any adapter layer.", + ) + + +def check_time_normalized(ds: xr.Dataset) -> None: + """Bound contract for time normalization — use in module input_contracts.""" + assert_time_normalized(ds) diff --git a/src/adapt/contracts/tracking.py b/src/adapt/contracts/tracking.py index c81b51f..6a95be6 100644 --- a/src/adapt/contracts/tracking.py +++ b/src/adapt/contracts/tracking.py @@ -92,3 +92,15 @@ def assert_cell_events(df: pd.DataFrame) -> None: f"Cell events contract violated: invalid event_type present " f"(valid={sorted(_VALID_EVENT_TYPES)})", ) + + +def check_tracked_cells(df: pd.DataFrame) -> None: + """Bound contract for tracked cells DataFrame (skips validation on empty frame).""" + if not df.empty: + assert_tracked_cells(df) + + +def check_cell_events(df: pd.DataFrame) -> None: + """Bound contract for cell events DataFrame (skips validation on empty frame).""" + if not df.empty: + assert_cell_events(df) From e90c0182e36d066a37eaaa453623b901f1edbc37 Mon Sep 17 00:00:00 2001 From: Bhupendra Raut Date: Thu, 14 May 2026 14:13:29 -0500 Subject: [PATCH 6/6] FIX: fix processor spacing --- src/adapt/execution/graph/executor.py | 2 +- src/adapt/runtime/processor.py | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/adapt/execution/graph/executor.py b/src/adapt/execution/graph/executor.py index 6bde5a1..498b24d 100644 --- a/src/adapt/execution/graph/executor.py +++ b/src/adapt/execution/graph/executor.py @@ -69,7 +69,7 @@ def run(self, context: dict) -> dict: while len(completed) < len(self.nodes): iteration += 1 - if iteration > max_iterations: + if iteration > max_iterations: # pragma: no cover — preempted by no-progress check pending = [n.name for n in self.nodes if n.name not in completed] raise RuntimeError( f"Execution graph appears to contain a cycle or unresolvable " diff --git a/src/adapt/runtime/processor.py b/src/adapt/runtime/processor.py index fee6fb8..2b23db1 100644 --- a/src/adapt/runtime/processor.py +++ b/src/adapt/runtime/processor.py @@ -86,13 +86,13 @@ def __init__( ): super().__init__(daemon=True, name=name) - self.input_queue = input_queue - self.config = config - self.output_dirs = {k: Path(v) for k, v in output_dirs.items()} + self.input_queue = input_queue + self.config = config + self.output_dirs = {k: Path(v) for k, v in output_dirs.items()} self.file_tracker = file_tracker - self.repository = repository - self._stop_event = threading.Event() - self.output_lock = threading.Lock() + self.repository = repository + self._stop_event = threading.Event() + self.output_lock = threading.Lock() if not self.repository: raise ValueError(