From d9cfca3a0643b93165898d134cb7a1a0c989ba70 Mon Sep 17 00:00:00 2001 From: Robert Jackson Date: Fri, 6 Mar 2026 15:02:47 -0600 Subject: [PATCH 01/14] BUMP: Version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index ffd1457..8d6036d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "radclss" -version = "2026.3.6" +version = "2026.3.6.1" description = "Extracted Radar Columns and In Situ Sensors" readme = "README.md" requires-python = ">=3.10" From c10bfabcbebdaff648d3951695cd7a9fde4f4828 Mon Sep 17 00:00:00 2001 From: Robert Jackson Date: Tue, 24 Mar 2026 09:41:08 -0500 Subject: [PATCH 02/14] TST: No python 3.14 support yet. Right now we just test up to py 3.13 since cartopy does not work. --- .github/workflows/run-test-suite.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/run-test-suite.yml b/.github/workflows/run-test-suite.yml index 5dd4fef..bc4f32d 100644 --- a/.github/workflows/run-test-suite.yml +++ b/.github/workflows/run-test-suite.yml @@ -23,7 +23,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"] + python-version: ["3.10", "3.11", "3.12", "3.13"] os: [macOS, ubuntu] include: - os: macos-latest From 0e3ba79cff58c5e866550d9d6c13523e05ecd873 Mon Sep 17 00:00:00 2001 From: Robert Jackson Date: Wed, 25 Mar 2026 15:42:28 -0500 Subject: [PATCH 03/14] FIX: Some changes to make garbage collection better. --- src/radclss/util/column_utils.py | 69 +++++++++++++++++--------------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/src/radclss/util/column_utils.py b/src/radclss/util/column_utils.py index 50f689d..9f5153c 100644 --- a/src/radclss/util/column_utils.py +++ b/src/radclss/util/column_utils.py @@ -95,29 +95,32 @@ def get_nexrad_column( time_list = np.array(time_list) path = f"s3://{bucket_name}/" + file_list[np.argmin(np.abs(time_list - right_now))] radar_obj = pyart.io.read_nexrad_archive(path) - column_list = [] - for lat, lon in zip(lats, lons): - # Make sure we are interpolating from the radar's location above sea level - # NOTE: interpolating throughout Troposphere to match sonde to in the future - - da = pyart.util.columnsect.column_vertical_profile(radar_obj, lat, lon) - # check for valid heights - valid = np.isfinite(da["height"]) - n_valid = int(valid.sum()) - if n_valid > 0: - da = da.sel(height=valid).sortby("height").interp(height=height_bins) - else: - target_height = xr.DataArray(height_bins, dims="height", name="height") - da = da.reindex(height=target_height) - - # Add the latitude and longitude of the extracted column - da["lat"], da["lon"] = lat, lon - # Convert timeoffsets to timedelta object and precision on datetime64 - da.time_offset.data = da.time_offset.values.astype("timedelta64[s]") - da.base_time.data = da.base_time.values.astype("datetime64[s]") - # Time is based off the start of the radar volume - da["gate_time"] = da.base_time.values + da.isel(height=0).time_offset.values - column_list.append(da) + try: + column_list = [] + for lat, lon in zip(lats, lons): + # Make sure we are interpolating from the radar's location above sea level + # NOTE: interpolating throughout Troposphere to match sonde to in the future + + da = pyart.util.columnsect.column_vertical_profile(radar_obj, lat, lon) + # check for valid heights + valid = np.isfinite(da["height"]) + n_valid = int(valid.sum()) + if n_valid > 0: + da = da.sel(height=valid).sortby("height").interp(height=height_bins) + else: + target_height = xr.DataArray(height_bins, dims="height", name="height") + da = da.reindex(height=target_height) + + # Add the latitude and longitude of the extracted column + da["lat"], da["lon"] = lat, lon + # Convert timeoffsets to timedelta object and precision on datetime64 + da.time_offset.data = da.time_offset.values.astype("timedelta64[s]") + da.base_time.data = da.base_time.values.astype("datetime64[s]") + # Time is based off the start of the radar volume + da["gate_time"] = da.base_time.values + da.isel(height=0).time_offset.values + column_list.append(da) + finally: + del radar_obj # Concatenate the extracted radar columns for this scan across all sites ds = xr.concat([data for data in column_list if data], dim="station") @@ -184,12 +187,14 @@ def subset_points( f"{nfile} failed to open and is possibly corrupt." + "RadCLss will not generate a column for this file." ) - # Check for single sweep scans - if np.ma.is_masked(radar.sweep_start_ray_index["data"][1:]): - radar.sweep_start_ray_index["data"] = np.ma.array([0]) - radar.sweep_end_ray_index["data"] = np.ma.array([radar.nrays]) + return ds + + try: + # Check for single sweep scans + if np.ma.is_masked(radar.sweep_start_ray_index["data"][1:]): + radar.sweep_start_ray_index["data"] = np.ma.array([0]) + radar.sweep_end_ray_index["data"] = np.ma.array([radar.nrays]) - if radar: if radar.time["data"].size > 0: # Easier to map the nearest sonde file to radar gates before extraction if sonde is not None: @@ -279,11 +284,9 @@ def subset_points( # Concatenate the extracted radar columns for this scan across all sites ds = xr.concat([data for data in column_list if data], dim="station") ds = _add_station_vars(ds, sites, site_alt) - # delete the radar to free up memory - del radar, column_list, da - else: - # delete the rhi file - del radar + del column_list, da + finally: + del radar return ds From 1ab2c66407b714ebc4e9c8aa52caafbc7a83e8e0 Mon Sep 17 00:00:00 2001 From: Robert Jackson Date: Wed, 25 Mar 2026 16:17:03 -0500 Subject: [PATCH 04/14] FIX: Explicitly close HDF5/NetCDF file handles to prevent handle exhaustion in parallel runs Close xarray datasets opened via act.io.read_arm_netcdf and xr.open_dataset immediately after data is loaded into memory, rather than relying on garbage collection, which does not run frequently enough in parallel workers. Co-Authored-By: Claude Sonnet 4.6 --- src/radclss/util/column_utils.py | 10 +++++++--- src/radclss/vis/quicklooks.py | 3 +++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/radclss/util/column_utils.py b/src/radclss/util/column_utils.py index 812edc1..e771d6a 100644 --- a/src/radclss/util/column_utils.py +++ b/src/radclss/util/column_utils.py @@ -248,6 +248,7 @@ def subset_points( ] radar.fields["sonde_" + var]["datastream"] = ds_sonde.datastream + ds_sonde.close() del radar_start, sonde_start, ds_sonde del z_dict, sonde_dict @@ -357,9 +358,12 @@ def match_datasets_act( grd_ds = ground else: # Read in the file using ACT - grd_ds = act.io.read_arm_netcdf(ground, cleanup_qc=True, drop_variables=discard) - # Default are Lazy Arrays; convert for matching with column - grd_ds = grd_ds.compute() + _grd_raw = act.io.read_arm_netcdf( + ground, cleanup_qc=True, drop_variables=discard + ) + # Default are Lazy Arrays; convert for matching with column, then close the file handle + grd_ds = _grd_raw.compute() + _grd_raw.close() # check if a list containing new variable names exists. if prefix: grd_ds = grd_ds.rename_vars({v: f"{prefix}{v}" for v in grd_ds.data_vars}) diff --git a/src/radclss/vis/quicklooks.py b/src/radclss/vis/quicklooks.py index 78f36a0..a951dfe 100644 --- a/src/radclss/vis/quicklooks.py +++ b/src/radclss/vis/quicklooks.py @@ -102,6 +102,9 @@ def create_radclss_columns( ) ).plot(y="height", ax=axarr[row, col], vmin=vmin, vmax=vmax, **kwargs) + if isinstance(radclss, str): + ds.close() + return fig, axarr From e2e30423632b5402a12498f5e0b8905ccde9f1e5 Mon Sep 17 00:00:00 2001 From: Robert Jackson Date: Wed, 25 Mar 2026 16:46:04 -0500 Subject: [PATCH 05/14] DIAG: Log open HDF5 handles at key points to trace file descriptor leaks Adds _log_open_hdf5() using h5py's low-level API, called after each radar file is processed (subset_points, get_nexrad_column) and after each parallel collection loop in radclss_core, so that leaking handles can be pinpointed from WARNING-level log output. Co-Authored-By: Claude Sonnet 4.6 --- src/radclss/core/radclss_core.py | 11 ++++++++++- src/radclss/util/column_utils.py | 20 ++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/radclss/core/radclss_core.py b/src/radclss/core/radclss_core.py index 635c9c1..80bd48f 100644 --- a/src/radclss/core/radclss_core.py +++ b/src/radclss/core/radclss_core.py @@ -5,7 +5,12 @@ import numpy as np import pandas as pd -from ..util.column_utils import subset_points, match_datasets_act, get_nexrad_column +from ..util.column_utils import ( + subset_points, + match_datasets_act, + get_nexrad_column, + _log_open_hdf5, +) from ..config.default_config import DEFAULT_DISCARD_VAR from ..config.output_config import get_output_config from dask.distributed import Client, as_completed @@ -209,6 +214,8 @@ def radclss( f" Finished {k}: {successful}/{len(columns[k])} successful extractions" ) + _log_open_hdf5("radclss main process after radar column extraction") + # Assemble individual columns into single DataSet # try: # Concatenate all extracted columns across time dimension to form daily timeseries @@ -325,6 +332,8 @@ def _get_nexrad_wrapper(time_str): else: nexrad_columns = None + _log_open_hdf5("radclss main process after NEXRAD extraction") + if verbose: print("\n" + "=" * 80) print("STEP 4: Concatenating and processing time coordinates") diff --git a/src/radclss/util/column_utils.py b/src/radclss/util/column_utils.py index e771d6a..9512ccb 100644 --- a/src/radclss/util/column_utils.py +++ b/src/radclss/util/column_utils.py @@ -1,4 +1,5 @@ import boto3 +import h5py import pyart import act import numpy as np @@ -14,6 +15,23 @@ from ..config import get_output_config +def _log_open_hdf5(label=""): + """Log all currently open HDF5 file handles via h5py's low-level API.""" + try: + fids = h5py.h5f.get_obj_ids(types=h5py.h5f.OBJ_FILE) + logging.warning("%s: %d open HDF5 handle(s)", label, len(fids)) + for fid in fids: + try: + name = h5py.h5f.get_name(fid) + logging.warning( + " %s", name.decode() if isinstance(name, bytes) else name + ) + except Exception: + logging.warning(" ", fid) + except Exception as exc: + logging.warning("HDF5 diagnostic unavailable: %s", exc) + + def get_nexrad_column( rad_time, site, @@ -121,6 +139,7 @@ def get_nexrad_column( column_list.append(da) finally: del radar_obj + _log_open_hdf5("get_nexrad_column after del radar_obj") # Concatenate the extracted radar columns for this scan across all sites ds = xr.concat([data for data in column_list if data], dim="station") @@ -288,6 +307,7 @@ def subset_points( del column_list, da finally: del radar + _log_open_hdf5("subset_points after del radar") return ds From 5f459cc7be1649adb3f810472503bb86a3389d17 Mon Sep 17 00:00:00 2001 From: Robert Jackson Date: Wed, 25 Mar 2026 17:14:30 -0500 Subject: [PATCH 06/14] PERF: Speed up serial processing with sonde cache, thread pools, and chunked concat - Cache sonde dataset reads across radar files (_read_sonde_cached): the same sonde launch is re-used by many consecutive scans, so each unique file is now read and computed only once per process lifetime. - Split match_datasets_act into _prepare_match (I/O + resample) and _apply_match (in-place merge). The prepare step is now run concurrently for all ground instruments via ThreadPoolExecutor; the apply step remains sequential. - Replace the serial NEXRAD time-list loop with a ThreadPoolExecutor: fetches are pure network I/O so they parallelise well without multiprocessing overhead. - Replace single large xr.concat(columns[k]) with a chunked tree-reduce concat (chunk size 50) to bound peak memory during the assembly step. Co-Authored-By: Claude Sonnet 4.6 --- src/radclss/core/radclss_core.py | 225 ++++++++++++++++++++----------- src/radclss/util/column_utils.py | 199 +++++++++++++++------------ 2 files changed, 259 insertions(+), 165 deletions(-) diff --git a/src/radclss/core/radclss_core.py b/src/radclss/core/radclss_core.py index 80bd48f..e984b6f 100644 --- a/src/radclss/core/radclss_core.py +++ b/src/radclss/core/radclss_core.py @@ -4,12 +4,15 @@ import act import numpy as np import pandas as pd +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import as_completed as futures_as_completed from ..util.column_utils import ( subset_points, - match_datasets_act, get_nexrad_column, _log_open_hdf5, + _prepare_match, + _apply_match, ) from ..config.default_config import DEFAULT_DISCARD_VAR from ..config.output_config import get_output_config @@ -310,17 +313,27 @@ def _get_nexrad_wrapper(time_str): ) else: if verbose: - print(" Processing NEXRAD columns in serial mode...") - for i, time_str in enumerate(time_list, 1): - if verbose and i % 5 == 0: - print(f" [{i}/{len(time_list)}] Fetching NEXRAD for {time_str}") - nexrad_columns.append( - get_nexrad_column( - time_str, + print(f" Fetching {len(time_list)} NEXRAD columns via thread pool...") + with ThreadPoolExecutor() as pool: + futs = { + pool.submit( + get_nexrad_column, + t, output_config["site"], input_site_dict, - ) - ) + nexrad_site, + ): t + for t in time_list + } + for fut in futures_as_completed(futs): + try: + result = fut.result() + if result is not None: + nexrad_columns.append(result) + except Exception as exc: + logging.warning( + "NEXRAD fetch failed for %s: %s", futs[fut], exc + ) if verbose: valid_nexrad = sum(1 for x in nexrad_columns if x is not None) @@ -342,12 +355,20 @@ def _get_nexrad_wrapper(time_str): output_platform = output_config["platform"] output_level = output_config["level"] - # Convert time variables to something xarray understands + # Convert time variables to something xarray understands. + # Concat in chunks of 50 so only a fraction of per-file datasets are held in + # memory at once before being folded into an intermediate result. + _CONCAT_CHUNK = 50 ds_concat = {} for k in columns.keys(): if verbose: print(f" Processing {k}...") - ds_concat[k] = xr.concat([data for data in columns[k] if data], dim="time") + valid = [data for data in columns[k] if data is not None] + chunks = [ + xr.concat(valid[i : i + _CONCAT_CHUNK], dim="time") + for i in range(0, len(valid), _CONCAT_CHUNK) + ] + ds_concat[k] = xr.concat(chunks, dim="time") if len(chunks) > 1 else chunks[0] if verbose: print( f" Concatenated dimensions: time={ds_concat[k].dims['time']}, station={ds_concat[k].dims['station']}, height={ds_concat[k].dims['height']}" @@ -506,7 +527,7 @@ def _get_nexrad_wrapper(time_str): if verbose: print(" Merging NEXRAD data into combined dataset...") ds_concat = xr.merge([ds_concat, nexrad_columns]) - + if verbose: print(f" Total variables in merged dataset: {len(ds_concat.data_vars)}") print("\n" + "=" * 80) @@ -620,8 +641,7 @@ def _get_nexrad_wrapper(time_str): ) current_client.restart() del ds_concat - - + # Free up Memory del columns @@ -636,7 +656,10 @@ def _get_nexrad_wrapper(time_str): print("=" * 80) print(f" Radar processing completed at: {time.strftime('%H:%M:%S')}") - # Find all of the met stations and match to columns + # Build a task list: (label, site, _prepare_match kwargs) for every instrument. + # All entries are independent so we can load+resample them concurrently, then + # apply results sequentially into ds. + _instrument_tasks = [] vol_keys = list(volumes.keys()) for k in vol_keys: if k == "sonde": @@ -650,76 +673,124 @@ def _get_nexrad_wrapper(time_str): else: instrument = k site = base_station + site = site.upper() if instrument == "kazr2": - if verbose: - print(f"Matching KAZR2 data for site: {site}") - ds = match_datasets_act( - ds, - volumes[k], - site.upper(), - discard=discard_var["kazr2"], - resample="mean", - prefix="kazr2_", - verbose=verbose, + _instrument_tasks.append( + ( + k, + site, + dict( + ground=volumes[k], + site=site, + discard=discard_var["kazr2"], + column_time=ds.time, + column_height=ds["height"], + resample="mean", + prefix="kazr2_", + ), + ) ) - if instrument == "met": - if verbose: - print(f"Matching MET data for site: {site}") - ds = match_datasets_act( - ds, - volumes[k][0], - site.upper(), - resample="mean", - discard=discard_var["met"], - verbose=verbose, + elif instrument == "met": + _instrument_tasks.append( + ( + k, + site, + dict( + ground=volumes[k][0], + site=site, + discard=discard_var["met"], + column_time=ds.time, + column_height=ds["height"], + resample="mean", + ), + ) ) - - if instrument == "pluvio": - # Weighing Bucket Rain Gauge - ds = match_datasets_act( - ds, - volumes[k], - site.upper(), - resample="sum", - discard=discard_var["pluvio"], - verbose=verbose, + elif instrument == "pluvio": + _instrument_tasks.append( + ( + k, + site, + dict( + ground=volumes[k], + site=site, + discard=discard_var["pluvio"], + column_time=ds.time, + column_height=ds["height"], + resample="sum", + ), + ) ) - - if instrument == "ld": - ds = match_datasets_act( - ds, - volumes[k], - site.upper(), - discard=discard_var["ldquants"], - resample="mean", - prefix="ldquants_", - verbose=verbose, + elif instrument == "ld": + _instrument_tasks.append( + ( + k, + site, + dict( + ground=volumes[k], + site=site, + discard=discard_var["ldquants"], + column_time=ds.time, + column_height=ds["height"], + resample="mean", + prefix="ldquants_", + ), + ) ) - - if instrument == "vd": - # Laser Disdrometer - Supplemental Site - ds = match_datasets_act( - ds, - volumes[k], - site.upper(), - discard=discard_var["vdisquants"], - resample="mean", - prefix="vdisquants_", - verbose=verbose, + elif instrument == "vd": + _instrument_tasks.append( + ( + k, + site, + dict( + ground=volumes[k], + site=site, + discard=discard_var["vdisquants"], + column_time=ds.time, + column_height=ds["height"], + resample="mean", + prefix="vdisquants_", + ), + ) ) - - if instrument == "wxt": - # Laser Disdrometer - Supplemental Site - ds = match_datasets_act( - ds, - volumes[k], - site.upper(), - discard=discard_var["wxt"], - resample="mean", - verbose=verbose, + elif instrument == "wxt": + _instrument_tasks.append( + ( + k, + site, + dict( + ground=volumes[k], + site=site, + discard=discard_var["wxt"], + column_time=ds.time, + column_height=ds["height"], + resample="mean", + ), + ) ) + if verbose: + print( + f" Loading {len(_instrument_tasks)} ground instrument file(s) in parallel..." + ) + + # Parallel I/O + resample — each task reads its own files independently + with ThreadPoolExecutor() as pool: + _futs = { + pool.submit(_prepare_match, **kwargs): (k, site) + for k, site, kwargs in _instrument_tasks + } + + # Sequential apply — in-place writes to ds must not overlap + for fut, (k, site) in _futs.items(): + try: + _, matched = fut.result() + ds = _apply_match(ds, site, matched) + if verbose: + print(f" Matched {k} for site {site}") + except Exception as exc: + logging.warning("Failed to match %s for site %s: %s", k, site, exc) + else: # There is no column extraction raise RuntimeError(": RadCLss FAILURE (All Columns Failed to Extract): ") diff --git a/src/radclss/util/column_utils.py b/src/radclss/util/column_utils.py index 9512ccb..4642dc4 100644 --- a/src/radclss/util/column_utils.py +++ b/src/radclss/util/column_utils.py @@ -14,6 +14,17 @@ from ..config import DEFAULT_DISCARD_VAR, DEFAULT_NEXRAD_RADARS from ..config import get_output_config +_sonde_cache = {} + + +def _read_sonde_cached(path, exclude): + """Return a fully-loaded sonde dataset, reading from disk only on first call.""" + if path not in _sonde_cache: + raw = act.io.read_arm_netcdf(path, cleanup_qc=True, drop_variables=exclude) + _sonde_cache[path] = raw.compute() + raw.close() + return _sonde_cache[path] + def _log_open_hdf5(label=""): """Log all currently open HDF5 file handles via h5py's low-level API.""" @@ -239,12 +250,9 @@ def subset_points( # difference in time between radar file and each sonde file start_diff = [radar_start - sonde for sonde in sonde_start] - # merge the sonde file into the radar object - ds_sonde = act.io.read_arm_netcdf( - sonde[start_diff.index(min(start_diff))], - cleanup_qc=True, - drop_variables=exclude_sonde, - ) + # merge the sonde file into the radar object (cached across radar files) + sonde_path = sonde[start_diff.index(min(start_diff))] + ds_sonde = _read_sonde_cached(sonde_path, exclude_sonde) # create list of variables within sonde dataset to add to the radar file for var in list(ds_sonde.keys()): @@ -267,7 +275,6 @@ def subset_points( ] radar.fields["sonde_" + var]["datastream"] = ds_sonde.datastream - ds_sonde.close() del radar_start, sonde_start, ds_sonde del z_dict, sonde_dict @@ -311,145 +318,86 @@ def subset_points( return ds -def match_datasets_act( - column, +def _prepare_match( ground, site, discard, + column_time, + column_height, resample="sum", resample_time="5Min", DataSet=False, prefix=None, - verbose=False, ): """ - Time synchronization of a Ground Instrumentation Dataset to - a Radar Column for Specific Locations using the ARM ACT package. - This module also supports vertically pointing radars such as the KAZR. - - Parameters - ---------- - column : Xarray DataSet - Xarray DataSet containing the extracted radar column above multiple locations. - Dimensions should include Time, Height, Site + Load a ground instrument file, resample it to the column time/height grid, + and return ``(site, matched_dataset)`` without modifying the column in-place. - ground : str; Xarray DataSet - String containing the path of the ground instrumentation file that is desired - to be included within the extracted radar column dataset. - If DataSet is set to True, ground is Xarray Dataset and will skip I/O. - - site : str - Location of the ground instrument. Should be included within the filename. - - discard : list - List containing the desired input ground instrumentation variables to be - removed from the xarray DataSet. - - resample : str - Mathematical operational for resampling ground instrumentation to the radar time. - 'sum' will sum the ground data within the resample_time window, - 'mean' will average the ground data within the resample_time window, - and 'skip' will not resample the data and will only interpolate to the radar time. - Default is 'sum'. - - resample_time : str - Time resolution for resampling ground instrumentation data before mapping to radar time. - Default is "5Min". - - DataSet : boolean - Boolean flag to determine if ground input is an Xarray Dataset. - Set to True if ground input is Xarray DataSet. - - prefix : str - prefix for the desired spelling of variable names for the input - datastream (to fix duplicate variable names between instruments) - - verbose : boolean - Boolean flag to set verbose output during processing. Default is False. - - Returns - ------- - ds : Xarray DataSet - Xarray Dataset containing the time-synced in-situ ground observations with - the inputed radar column + Safe to call concurrently from multiple threads. """ - # Check to see if input is xarray DataSet or a file path if DataSet: grd_ds = ground else: - # Read in the file using ACT _grd_raw = act.io.read_arm_netcdf( ground, cleanup_qc=True, drop_variables=discard ) - # Default are Lazy Arrays; convert for matching with column, then close the file handle grd_ds = _grd_raw.compute() _grd_raw.close() - # check if a list containing new variable names exists. if prefix: grd_ds = grd_ds.rename_vars({v: f"{prefix}{v}" for v in grd_ds.data_vars}) - # Remove Base_Time before Resampling Data since you can't force 1 datapoint to 5 min sum if "base_time" in grd_ds.data_vars: del grd_ds["base_time"] - # Check to see if height is a dimension within the ground instrumentation. - # If so, first interpolate heights to match radar, before interpolating time. if "height" in grd_ds.dims: - grd_ds = grd_ds.interp(height=column["height"], method="linear") + grd_ds = grd_ds.interp(height=column_height, method="linear") if "range" in grd_ds.dims: - grd_ds = grd_ds.interp(range=column["height"], method="linear") + grd_ds = grd_ds.interp(range=column_height, method="linear") grd_ds = grd_ds.drop_vars("height") grd_ds = grd_ds.rename({"range": "height"}) - # Resample the ground data to 5 min and interpolate to the radar time. - # Keep data variable attributes to help distingish between instruments/locations - # Keep only numeric data variables to avoid issues with resampling non-numeric variables (e.g. lat/lon) non_numeric_vars = [ var for var in grd_ds.data_vars if not np.issubdtype(grd_ds[var].dtype, np.number) ] - grd_ds = grd_ds.drop_vars(non_numeric_vars) + if resample == "mean": matched = ( grd_ds.resample(time=resample_time, closed="right") .mean(keep_attrs=True) - .interp(time=column.time, method="linear") + .interp(time=column_time, method="linear") ) elif resample == "skip": - matched = grd_ds.interp(time=column.time, method="linear") + matched = grd_ds.interp(time=column_time, method="linear") elif resample == "sum": matched = ( grd_ds.resample(time=resample_time, closed="right") .sum(keep_attrs=True) - .interp(time=column.time, method="linear") + .interp(time=column_time, method="linear") ) else: raise ValueError( "Invalid resample method. Please choose 'mean', 'sum', or 'skip'." ) - # Add site location as a dimension for the Pluvio data matched = matched.assign_coords(coords=dict(station=site)) matched = matched.expand_dims("station") - # Remove Lat/Lon Data variables as it is included within the Matched Dataset with Site Identfiers - if "lat" in matched.data_vars: - del matched["lat"] - if "lon" in matched.data_vars: - del matched["lon"] - if "alt" in matched.data_vars: - del matched["alt"] - - # Update the individual Variables to Hold Global Attributes - # global attributes will be lost on merging into the matched dataset. - # Need to keep as many references and descriptors as possible + for attr in ("lat", "lon", "alt"): + if attr in matched.data_vars: + del matched[attr] + for var in matched.data_vars: matched[var].attrs.update(source=matched.datastream) - # Merge the two DataSets + return site, matched + + +def _apply_match(column, site, matched): + """Merge a prepared match result into ``column`` in-place and return it.""" for k in matched.data_vars: if k in column.data_vars: column[k].sel(station=site)[:] = matched.sel(station=site)[k][:].astype( @@ -469,10 +417,85 @@ def match_datasets_act( column[k] = ( column[k].fillna(column[k].attrs["missing_value"]).astype(float) ) - grd_ds.close() return column +def match_datasets_act( + column, + ground, + site, + discard, + resample="sum", + resample_time="5Min", + DataSet=False, + prefix=None, + verbose=False, +): + """ + Time synchronization of a Ground Instrumentation Dataset to + a Radar Column for Specific Locations using the ARM ACT package. + This module also supports vertically pointing radars such as the KAZR. + + Parameters + ---------- + column : Xarray DataSet + Xarray DataSet containing the extracted radar column above multiple locations. + Dimensions should include Time, Height, Site + + ground : str; Xarray DataSet + String containing the path of the ground instrumentation file that is desired + to be included within the extracted radar column dataset. + If DataSet is set to True, ground is Xarray Dataset and will skip I/O. + + site : str + Location of the ground instrument. Should be included within the filename. + + discard : list + List containing the desired input ground instrumentation variables to be + removed from the xarray DataSet. + + resample : str + Mathematical operational for resampling ground instrumentation to the radar time. + 'sum' will sum the ground data within the resample_time window, + 'mean' will average the ground data within the resample_time window, + and 'skip' will not resample the data and will only interpolate to the radar time. + Default is 'sum'. + + resample_time : str + Time resolution for resampling ground instrumentation data before mapping to radar time. + Default is "5Min". + + DataSet : boolean + Boolean flag to determine if ground input is an Xarray Dataset. + Set to True if ground input is Xarray DataSet. + + prefix : str + prefix for the desired spelling of variable names for the input + datastream (to fix duplicate variable names between instruments) + + verbose : boolean + Boolean flag to set verbose output during processing. Default is False. + + Returns + ------- + ds : Xarray DataSet + Xarray Dataset containing the time-synced in-situ ground observations with + the inputed radar column + """ + _, matched = _prepare_match( + ground, + site, + discard, + column.time, + column["height"], + resample=resample, + resample_time=resample_time, + DataSet=DataSet, + prefix=prefix, + ) + return _apply_match(column, site, matched) + + def _add_station_vars(ds, sites, site_alt): ds["station"] = sites # Assign the Main and Supplemental Site altitudes From 8da1aa7d92790e292dcd7d144bf7a59de001ae46 Mon Sep 17 00:00:00 2001 From: Robert Jackson Date: Thu, 26 Mar 2026 18:50:13 +0000 Subject: [PATCH 07/14] FIX: Remove HDF5 tracking code since it broke the workflow. --- src/radclss/core/radclss_core.py | 56 ++++++++++++++------------------ src/radclss/util/column_utils.py | 24 +++----------- 2 files changed, 28 insertions(+), 52 deletions(-) diff --git a/src/radclss/core/radclss_core.py b/src/radclss/core/radclss_core.py index e984b6f..5fdf723 100644 --- a/src/radclss/core/radclss_core.py +++ b/src/radclss/core/radclss_core.py @@ -10,7 +10,6 @@ from ..util.column_utils import ( subset_points, get_nexrad_column, - _log_open_hdf5, _prepare_match, _apply_match, ) @@ -217,7 +216,6 @@ def radclss( f" Finished {k}: {successful}/{len(columns[k])} successful extractions" ) - _log_open_hdf5("radclss main process after radar column extraction") # Assemble individual columns into single DataSet # try: @@ -286,7 +284,7 @@ def _get_nexrad_wrapper(time_str): input_site_dict, nexrad_radar=nexrad_site, ) - + results = current_client.map(_get_nexrad_wrapper, time_list) successful_count = 0 @@ -314,26 +312,30 @@ def _get_nexrad_wrapper(time_str): else: if verbose: print(f" Fetching {len(time_list)} NEXRAD columns via thread pool...") - with ThreadPoolExecutor() as pool: - futs = { - pool.submit( - get_nexrad_column, - t, - output_config["site"], - input_site_dict, - nexrad_site, - ): t - for t in time_list - } - for fut in futures_as_completed(futs): - try: - result = fut.result() - if result is not None: - nexrad_columns.append(result) - except Exception as exc: - logging.warning( - "NEXRAD fetch failed for %s: %s", futs[fut], exc + def _get_nexrad_wrapper(time_str): + return get_nexrad_column( + time_str, + output_config["site"], + input_site_dict, + nexrad_radar=nexrad_site, + ) + successful_count = 0 + failed_count = 0 + for t in time_list: + try: + nexrad_columns.append(_get_nexrad_wrapper(t)) + successful_count += 1 + if verbose and successful_count % 5 == 0: + print( + f" Completed {successful_count}/{len(time_list)} NEXRAD columns..." + ) + except Exception as error: + failed_count += 1 + if verbose: + print( + f" ERROR fetching NEXRAD data (total failures: {failed_count})" ) + logging.exception(error) if verbose: valid_nexrad = sum(1 for x in nexrad_columns if x is not None) @@ -345,7 +347,6 @@ def _get_nexrad_wrapper(time_str): else: nexrad_columns = None - _log_open_hdf5("radclss main process after NEXRAD extraction") if verbose: print("\n" + "=" * 80) @@ -631,15 +632,6 @@ def _get_nexrad_wrapper(time_str): if verbose: print("\n Freeing memory: deleting intermediate datasets...") ds_concat.close() - if serial is not True: - if current_client is None: - try: - current_client = Client.current() - except ValueError: - raise RuntimeError( - "No Dask client found. Please start a Dask client before running in parallel mode." - ) - current_client.restart() del ds_concat # Free up Memory diff --git a/src/radclss/util/column_utils.py b/src/radclss/util/column_utils.py index 4642dc4..5ac7b12 100644 --- a/src/radclss/util/column_utils.py +++ b/src/radclss/util/column_utils.py @@ -1,5 +1,4 @@ import boto3 -import h5py import pyart import act import numpy as np @@ -26,23 +25,6 @@ def _read_sonde_cached(path, exclude): return _sonde_cache[path] -def _log_open_hdf5(label=""): - """Log all currently open HDF5 file handles via h5py's low-level API.""" - try: - fids = h5py.h5f.get_obj_ids(types=h5py.h5f.OBJ_FILE) - logging.warning("%s: %d open HDF5 handle(s)", label, len(fids)) - for fid in fids: - try: - name = h5py.h5f.get_name(fid) - logging.warning( - " %s", name.decode() if isinstance(name, bytes) else name - ) - except Exception: - logging.warning(" ", fid) - except Exception as exc: - logging.warning("HDF5 diagnostic unavailable: %s", exc) - - def get_nexrad_column( rad_time, site, @@ -150,7 +132,6 @@ def get_nexrad_column( column_list.append(da) finally: del radar_obj - _log_open_hdf5("get_nexrad_column after del radar_obj") # Concatenate the extracted radar columns for this scan across all sites ds = xr.concat([data for data in column_list if data], dim="station") @@ -220,6 +201,10 @@ def subset_points( return ds try: + # Check for RHI and reduce to first sweep if > 1 sweep + if "rhi" in radar.scan_type: + radar = radar.extract_sweeps([0]) + # Check for single sweep scans if np.ma.is_masked(radar.sweep_start_ray_index["data"][1:]): radar.sweep_start_ray_index["data"] = np.ma.array([0]) @@ -314,7 +299,6 @@ def subset_points( del column_list, da finally: del radar - _log_open_hdf5("subset_points after del radar") return ds From 3c9997fd92f53404f753e6d68b8e051f6510ffda Mon Sep 17 00:00:00 2001 From: Robert Jackson Date: Tue, 7 Apr 2026 15:22:38 +0000 Subject: [PATCH 08/14] ADD: Support for KAZR, NEXRAD, and interpsonde. Bugfixes. --- pyproject.toml | 2 +- src/radclss/config/default_config.py | 53 ------------------ src/radclss/config/output_config.py | 2 +- src/radclss/core/radclss_core.py | 84 +++++++++++++++++----------- src/radclss/util/column_utils.py | 77 ++++++++++++++++++++----- 5 files changed, 114 insertions(+), 104 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8d6036d..e261745 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "radclss" -version = "2026.3.6.1" +version = "2026.4.7" description = "Extracted Radar Columns and In Situ Sensors" readme = "README.md" requires-python = ">=3.10" diff --git a/src/radclss/config/default_config.py b/src/radclss/config/default_config.py index d789005..9a7603a 100644 --- a/src/radclss/config/default_config.py +++ b/src/radclss/config/default_config.py @@ -52,15 +52,12 @@ "height_over_iso0", ], "radar_kasacr": [ - "signal_to_noise_ratio_copolar_h", "signal_to_noise_ratio_copolar_v", ], "radar_xsacr": [ - "signal_to_noise_ratio_copolar_h", "signal_to_noise_ratio_copolar_v", ], "kazr2": [ - "signal_to_noise_ratio_copolar_h", "signal_to_noise_ratio_crosspolar_v", ], "radar_csapr2": [ @@ -76,7 +73,6 @@ "uncorrected_reflectivity_v", "uncorrected_spectral_width_h", "uncorrected_spectral_width_v", - "signal_to_noise_ratio_copolar_h", "signal_to_noise_ratio_copolar_v", "normalized_coherent_power", "normalized_coherent_power_v", @@ -165,33 +161,12 @@ "accum_total_nrt", ], "ldquants": [ - "specific_differential_attenuation_xband20c", - "specific_differential_attenuation_kaband20c", - "specific_differential_attenuation_sband20c", "bringi_conv_stra_flag", "exppsd_slope", "norm_num_concen", "num_concen", "gammapsd_shape", "gammapsd_slope", - "mean_doppler_vel_wband20c", - "mean_doppler_vel_kaband20c", - "mean_doppler_vel_xband20c", - "mean_doppler_vel_sband20c", - "specific_attenuation_kaband20c", - "specific_attenuation_xband20c", - "specific_attenuation_sband20c", - "specific_differential_phase_kaband20c", - "specific_differential_phase_xband20c", - "specific_differential_phase_sband20c", - "differential_reflectivity_kaband20c", - "differential_reflectivity_xband20c", - "differential_reflectivity_sband20c", - "reflectivity_factor_wband20c", - "reflectivity_factor_kaband20c", - "reflectivity_factor_xband20c", - "reflectivity_factor_sband20c", - "bringi_conv_stra_flag", "time_offset", "base_time", "lat", @@ -199,32 +174,12 @@ "alt", ], "vdisquants": [ - "specific_differential_attenuation_xband20c", - "specific_differential_attenuation_kaband20c", - "specific_differential_attenuation_sband20c", "bringi_conv_stra_flag", "exppsd_slope", "norm_num_concen", "num_concen", "gammapsd_shape", "gammapsd_slope", - "mean_doppler_vel_wband20c", - "mean_doppler_vel_kaband20c", - "mean_doppler_vel_xband20c", - "mean_doppler_vel_sband20c", - "specific_attenuation_kaband20c", - "specific_attenuation_xband20c", - "specific_attenuation_sband20c", - "specific_differential_phase_kaband20c", - "specific_differential_phase_xband20c", - "specific_differential_phase_sband20c", - "differential_reflectivity_kaband20c", - "differential_reflectivity_xband20c", - "differential_reflectivity_sband20c", - "reflectivity_factor_wband20c", - "reflectivity_factor_kaband20c", - "reflectivity_factor_xband20c", - "reflectivity_factor_sband20c", "bringi_conv_stra_flag", "time_offset", "base_time", @@ -237,19 +192,11 @@ "time_offset", "time_bounds", "qc_temp_mean", - "temp_std", - "rh_mean", "qc_rh_mean", - "rh_std", - "atmos_pressure", "qc_atmos_pressure", - "wspd_arith_mean", "qc_wspd_arith_mean", - "wspd_vec_mean", "qc_wspd_vec_mean", - "wdir_vec_mean", "qc_wdir_vec_mean", - "wdir_vec_std", "qc_wxt_precip_rate_mean", "qc_wxt_cumul_precip", "logger_volt", diff --git a/src/radclss/config/output_config.py b/src/radclss/config/output_config.py index 427fbda..ec1a9fd 100644 --- a/src/radclss/config/output_config.py +++ b/src/radclss/config/output_config.py @@ -1,7 +1,7 @@ OUTPUT_SITE = "BNF" OUTPUT_FACILITY = "M1" OUTPUT_PLATFORM = "radclss" -OUTPUT_LEVEL = "c2" +OUTPUT_LEVEL = "c0" OUTPUT_GATE_TIME_ATTRS = { "long_name": ( "Time in Seconds that Cooresponds to the Start" diff --git a/src/radclss/core/radclss_core.py b/src/radclss/core/radclss_core.py index 5fdf723..815ab63 100644 --- a/src/radclss/core/radclss_core.py +++ b/src/radclss/core/radclss_core.py @@ -4,6 +4,8 @@ import act import numpy as np import pandas as pd +import os + from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed as futures_as_completed @@ -130,6 +132,7 @@ def radclss( ) # Call Subset Points columns = {} + datastream_names = {} if verbose: print("\n" + "=" * 80) print("STEP 1: Extracting radar columns") @@ -153,7 +156,7 @@ def radclss( results = current_client.map( subset_points, volumes[k], - sonde=volumes["sonde"], + sonde=None, input_site_dict=input_site_dict, height_bins=height_bins, rad_key=k, @@ -175,7 +178,7 @@ def radclss( print( f" ERROR processing file (total failures: {failed_count})" ) - + datastream_names[k] = columns[k][-1].attrs["datastream"] if verbose: print( f" Finished {k}: {successful_count} successful, {failed_count} failed" @@ -194,14 +197,17 @@ def radclss( print( f" [{file_count}/{len(volumes[k])}] Processing: {rad.split('/')[-1]}" ) - result = subset_points( - rad, - sonde=volumes["sonde"], - input_site_dict=input_site_dict, - height_bins=height_bins, - rad_key=k, - ) - columns[k].append(result) + try: + result = subset_points( + rad, + sonde=None, + input_site_dict=input_site_dict, + height_bins=height_bins, + rad_key=k, + ) + columns[k].append(result) + except Exception as e: + result = None if verbose: if result is not None: print( @@ -634,8 +640,10 @@ def _get_nexrad_wrapper(time_str): ds_concat.close() del ds_concat - # Free up Memory del columns + if verbose and serial is False: + print("\n Restarting dask client.") + current_client.restart() # If successful column extraction, apply in-situ if ds: @@ -654,8 +662,6 @@ def _get_nexrad_wrapper(time_str): _instrument_tasks = [] vol_keys = list(volumes.keys()) for k in vol_keys: - if k == "sonde": - continue if len(volumes[k]) == 0: if verbose: print(f"No files found for instrument/site: {k}") @@ -757,32 +763,34 @@ def _get_nexrad_wrapper(time_str): column_time=ds.time, column_height=ds["height"], resample="mean", + prefix="wxt_", + ), + ) + ) + elif instrument == "sonde": + _instrument_tasks.append( + ( + k, + site, + dict( + ground=volumes[k], + site=site, + discard=discard_var["sonde"], + column_time=ds.time, + column_height=ds["height"], + resample="mean", + prefix="sonde_", ), ) ) - if verbose: - print( - f" Loading {len(_instrument_tasks)} ground instrument file(s) in parallel..." - ) - - # Parallel I/O + resample — each task reads its own files independently - with ThreadPoolExecutor() as pool: - _futs = { - pool.submit(_prepare_match, **kwargs): (k, site) - for k, site, kwargs in _instrument_tasks - } - - # Sequential apply — in-place writes to ds must not overlap - for fut, (k, site) in _futs.items(): - try: - _, matched = fut.result() + results = [] + for k, site, kwargs in _instrument_tasks: + _, matched = _prepare_match(**kwargs) + if matched is not None: ds = _apply_match(ds, site, matched) - if verbose: - print(f" Matched {k} for site {site}") - except Exception as exc: - logging.warning("Failed to match %s for site %s: %s", k, site, exc) - + datastream_names[k] = matched.attrs["datastream"] + else: # There is no column extraction raise RuntimeError(": RadCLss FAILURE (All Columns Failed to Extract): ") @@ -796,6 +804,14 @@ def _get_nexrad_wrapper(time_str): del ds["base_time"].attrs["units"] del ds["time_offset"].attrs["units"] del ds["time"].attrs["units"] + + if verbose: + print(" Adding input datastream names to dataset attributes...") + for k, v in datastream_names.items(): + ds_type = k.split("_")[0] + for var in ds.data_vars: + if var.startswith(ds_type): + ds[var].attrs["input_datastream"] = v if verbose: print("\n" + "=" * 80) diff --git a/src/radclss/util/column_utils.py b/src/radclss/util/column_utils.py index 5ac7b12..24fe8d4 100644 --- a/src/radclss/util/column_utils.py +++ b/src/radclss/util/column_utils.py @@ -3,6 +3,7 @@ import act import numpy as np import xarray as xr +import pandas as pd import datetime import logging @@ -14,6 +15,7 @@ from ..config import get_output_config _sonde_cache = {} +_nexrad_cache = {} def _read_sonde_cached(path, exclude): @@ -24,6 +26,15 @@ def _read_sonde_cached(path, exclude): raw.close() return _sonde_cache[path] +def _read_nexrad_cache(path): + if path not in _nexrad_cache.keys(): + raw = pyart.io.read_nexrad_archive(path) + _nexrad_cache[path] = raw + # Only maintain 5 files in the cache to save memory + if len(_nexrad_cache.keys()) > 5: + first_key = next(iter(_nexrad_cache)) + del _nexrad_cache[first_key] + return _nexrad_cache[path] def get_nexrad_column( rad_time, @@ -105,7 +116,7 @@ def get_nexrad_column( time_list = np.array(time_list) path = f"s3://{bucket_name}/" + file_list[np.argmin(np.abs(time_list - right_now))] - radar_obj = pyart.io.read_nexrad_archive(path) + radar_obj = _read_nexrad_cache(path) try: column_list = [] for lat, lon in zip(lats, lons): @@ -136,6 +147,7 @@ def get_nexrad_column( # Concatenate the extracted radar columns for this scan across all sites ds = xr.concat([data for data in column_list if data], dim="station") ds = _add_station_vars(ds, sites, site_alt) + ds.attrs["nexrad_radar"] = nexrad_radar del column_list, da return ds @@ -204,7 +216,7 @@ def subset_points( # Check for RHI and reduce to first sweep if > 1 sweep if "rhi" in radar.scan_type: radar = radar.extract_sweeps([0]) - + # Check for single sweep scans if np.ma.is_masked(radar.sweep_start_ray_index["data"][1:]): radar.sweep_start_ray_index["data"] = np.ma.array([0]) @@ -236,7 +248,7 @@ def subset_points( start_diff = [radar_start - sonde for sonde in sonde_start] # merge the sonde file into the radar object (cached across radar files) - sonde_path = sonde[start_diff.index(min(start_diff))] + sonde_path = sonde[start_diff.index(min(abs(start_diff)))] ds_sonde = _read_sonde_cached(sonde_path, exclude_sonde) # create list of variables within sonde dataset to add to the radar file @@ -258,7 +270,7 @@ def subset_points( radar.fields["sonde_" + var]["standard_name"] = sonde_dict[ "standard_name" ] - radar.fields["sonde_" + var]["datastream"] = ds_sonde.datastream + radar.fields["sonde_" + var]["input_datastream"] = ds_sonde.datastream del radar_start, sonde_start, ds_sonde del z_dict, sonde_dict @@ -267,25 +279,50 @@ def subset_points( for lat, lon in zip(lats, lons): # Make sure we are interpolating from the radar's location above sea level # NOTE: interpolating throughout Troposphere to match sonde to in the future - - da = pyart.util.columnsect.column_vertical_profile(radar, lat, lon) + if not "rhi" in radar.scan_type: + da = pyart.util.columnsect.column_vertical_profile(radar, lat, lon) + else: + try: + da = pyart.util.get_field_location(radar, lat, lon) + except ValueError: + # If the columnsect fails, try adding 180 to the azimuths to account for potential mislabeling of radar location + radar.azimuth["data"] = radar.azimuth["data"] + 180 + # Need to adjust elevation as well to maintain the same relative geometry between radar and column locations + radar.elevation["data"] = 180 - radar.elevation["data"] + try: + da = pyart.util.get_field_location(radar, lat, lon) + except ValueError: + # NaNs will be returned if the columnsect fails again after adjusting the azimuths + da = pyart.util.columnsect.column_vertical_profile(radar, lat, lon) + + time_offset = da["time_offset"] # check for valid heights valid = np.isfinite(da["height"]) n_valid = int(valid.sum()) if n_valid > 0: - da = ( - da.sel(height=valid).sortby("height").interp(height=height_bins) - ) + try: + da = ( + da.sel(height=valid).sortby("height").interp(height=height_bins) + ) + except pd.errors.InvalidIndexError: + da = da.drop_duplicates("height", keep="first") + valid = np.isfinite(da["height"]) + da = ( + da.sel(height=valid).sortby("height").interp(height=height_bins) + ) + time_offset = time_offset.drop_duplicates("height", keep="first") else: target_height = xr.DataArray( height_bins, dims="height", name="height" ) da = da.reindex(height=target_height) + if "rhi" in radar.scan_type: + da["time_offset"] = time_offset # Add the latitude and longitude of the extracted column da["lat"], da["lon"] = lat, lon # Convert timeoffsets to timedelta object and precision on datetime64 - da.time_offset.data = da.time_offset.values.astype("timedelta64[s]") + da["time_offset"].data = da["time_offset"].values.astype("timedelta64[s]") da.base_time.data = da.base_time.values.astype("datetime64[s]") # Time is based off the start of the radar volume da["gate_time"] = ( @@ -296,6 +333,7 @@ def subset_points( # Concatenate the extracted radar columns for this scan across all sites ds = xr.concat([data for data in column_list if data], dim="station") ds = _add_station_vars(ds, sites, site_alt) + ds.attrs["input_datastream"] = radar.metadata["datastream"] del column_list, da finally: del radar @@ -323,17 +361,24 @@ def _prepare_match( grd_ds = ground else: _grd_raw = act.io.read_arm_netcdf( - ground, cleanup_qc=True, drop_variables=discard + ground, cleanup_qc=True, drop_variables=discard, parallel=False, ) - grd_ds = _grd_raw.compute() - _grd_raw.close() + grd_ds = _grd_raw if prefix: - grd_ds = grd_ds.rename_vars({v: f"{prefix}{v}" for v in grd_ds.data_vars}) + if prefix == "wxt_": + rename_dict = {v: f"{prefix}{v}" for v in grd_ds.data_vars if "wxt_" not in v} + else: + rename_dict = {v: f"{prefix}{v}" for v in grd_ds.data_vars} + + grd_ds = grd_ds.rename_vars(rename_dict) if "base_time" in grd_ds.data_vars: del grd_ds["base_time"] if "height" in grd_ds.dims: + if grd_ds["height"].attrs["units"] == "km": + grd_ds["height"] = grd_ds["height"] * 1000 + grd_ds["height"].attrs["units"] = "m" grd_ds = grd_ds.interp(height=column_height, method="linear") if "range" in grd_ds.dims: @@ -376,7 +421,9 @@ def _prepare_match( for var in matched.data_vars: matched[var].attrs.update(source=matched.datastream) - + matched[var].attrs["input_datastream"] = grd_ds.attrs["datastream"] + grd_ds.close() + _grd_raw.close() return site, matched From 8df3bf45de67f7226893e5d34fffeee7a98c0fe0 Mon Sep 17 00:00:00 2001 From: Robert Jackson Date: Fri, 17 Apr 2026 17:37:16 +0000 Subject: [PATCH 09/14] ADD: Remove all NaNs from profile before interpolation. --- src/radclss/util/column_utils.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/radclss/util/column_utils.py b/src/radclss/util/column_utils.py index 24fe8d4..38ce83b 100644 --- a/src/radclss/util/column_utils.py +++ b/src/radclss/util/column_utils.py @@ -301,14 +301,17 @@ def subset_points( n_valid = int(valid.sum()) if n_valid > 0: try: + # Drop all NaNs + da = ( - da.sel(height=valid).sortby("height").interp(height=height_bins) + da.sel(height=valid).dropna("height").sortby("height").interp(height=height_bins) ) except pd.errors.InvalidIndexError: da = da.drop_duplicates("height", keep="first") + valid = np.isfinite(da["height"]) da = ( - da.sel(height=valid).sortby("height").interp(height=height_bins) + da.sel(height=valid).dropna("height").sortby("height").interp(height=height_bins) ) time_offset = time_offset.drop_duplicates("height", keep="first") else: From 6fb71fc28b20fe9054237e82c058719ba04a97e6 Mon Sep 17 00:00:00 2001 From: Robert Jackson Date: Fri, 17 Apr 2026 18:04:22 +0000 Subject: [PATCH 10/14] FIX: Re-enable time averaging. --- src/radclss/core/radclss_core.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/radclss/core/radclss_core.py b/src/radclss/core/radclss_core.py index 815ab63..42737cc 100644 --- a/src/radclss/core/radclss_core.py +++ b/src/radclss/core/radclss_core.py @@ -127,9 +127,7 @@ def radclss( print(f"Using {time_coords} as time basis") print(f"Number of {time_coords} files: {len(volumes[time_coords])}") else: - raise NotImplementedError( - "Currently, only radar-based time coordinates are supported. Please specify a radar key from the volumes dictionary as the time_coords argument." - ) + # Call Subset Points columns = {} datastream_names = {} From c80b477383c4475646f6248bd42291e37b8314ca Mon Sep 17 00:00:00 2001 From: Robert Jackson Date: Fri, 17 Apr 2026 18:07:07 +0000 Subject: [PATCH 11/14] FIX: Syntax error --- src/radclss/core/radclss_core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/radclss/core/radclss_core.py b/src/radclss/core/radclss_core.py index 42737cc..cf36aae 100644 --- a/src/radclss/core/radclss_core.py +++ b/src/radclss/core/radclss_core.py @@ -126,7 +126,6 @@ def radclss( if verbose: print(f"Using {time_coords} as time basis") print(f"Number of {time_coords} files: {len(volumes[time_coords])}") - else: # Call Subset Points columns = {} From 05ce8dab47e5855f0a4fb01dcccf898c13dae9e4 Mon Sep 17 00:00:00 2001 From: Robert Jackson Date: Mon, 20 Apr 2026 21:36:40 +0000 Subject: [PATCH 12/14] FIX: Edits to add vertical columns from RHI --- src/radclss/core/radclss_core.py | 40 +++++++++- src/radclss/util/column_utils.py | 131 +++++++++++++++++++++++++++---- 2 files changed, 153 insertions(+), 18 deletions(-) diff --git a/src/radclss/core/radclss_core.py b/src/radclss/core/radclss_core.py index cf36aae..9b14104 100644 --- a/src/radclss/core/radclss_core.py +++ b/src/radclss/core/radclss_core.py @@ -203,6 +203,7 @@ def radclss( rad_key=k, ) columns[k].append(result) + except Exception as e: result = None if verbose: @@ -241,9 +242,20 @@ def radclss( if verbose: print(f" {k}: {len(columns[k])} columns") print(f" Time range: {min_times[k]} to {max_times[k]}") - + min_time = min(np.array([x for x in min_times.values()])) max_time = max(np.array([x for x in max_times.values()])) + if not "radar" in time_coords and not _is_valid_offset(time_coords): + if verbose: + print(f"Loading timestamps from {time_coords}...") + time_ds = xr.open_dataset(volumes[time_coords][0]) + ds_times = time_ds["time"].load().copy() + ds_times = ds_times.sel(time=slice(min_time, max_time)) + min_time = min(ds_times.values) + max_time = max(ds_times.values) + time_ds.close() + if verbose: + print(f" Time range: {min_time} to {max_time}") if verbose: print(f"\nOverall time range: {min_time} to {max_time}") @@ -264,7 +276,15 @@ def radclss( for x in columns[time_coords] ] ) - + elif _is_valid_offset(time_coords): + time_list = pd.date_range(min_time, max_time, freq=time_coords).strftime("%Y-%m-%dT%H:%M:%S") + else: + time_list = sorted( + [ + str(x.dt.strftime("%Y-%m-%dT%H:%M:%S").values) + for x in ds_times + ] + ) if verbose: print(f" Number of NEXRAD time steps to fetch: {len(time_list)}") print(f" Time list: {time_list[0]} to {time_list[-1]}") @@ -435,7 +455,7 @@ def _get_nexrad_wrapper(time_str): ds_concat[k] = ds_concat[k].reindex( time=nexrad_columns["time"], method="nearest" ) - else: + elif _is_valid_offset(time_coords): if verbose: print(f" Resampling to {time_coords} intervals") for k in ds_concat.keys(): @@ -444,13 +464,18 @@ def _get_nexrad_wrapper(time_str): nexrad_columns = nexrad_columns.resample(time=time_coords) # Then, reindex to the largest of the time arrays - new_coordinates = pd.date_range(min_time, max_time, time_coords) + new_coordinates = pd.date_range(min_time, max_time, freq=time_coords) if verbose: print(f" Creating new time grid: {len(new_coordinates)} time steps") for k in ds_concat.keys(): ds_concat[k] = ds_concat[k].reindex(time=new_coordinates) if nexrad: nexrad_columns = nexrad_columns.reindex(time=new_coordinates) + else: + for k in ds_concat.keys(): + ds_concat[k] = ds_concat[k].reindex(time=ds_times, method="nearest") + if nexrad: + nexrad_columns = nexrad_columns.reindex(time=ds_times, method="nearest") # Rename all variables according to their radar name if verbose: @@ -824,3 +849,10 @@ def _get_nexrad_wrapper(time_str): print("=" * 80) return ds + +def _is_valid_offset(s: str) -> bool: + try: + return pd.tseries.frequencies.to_offset(s) is not None + except ValueError: + return False + diff --git a/src/radclss/util/column_utils.py b/src/radclss/util/column_utils.py index 38ce83b..fff41f4 100644 --- a/src/radclss/util/column_utils.py +++ b/src/radclss/util/column_utils.py @@ -6,6 +6,7 @@ import pandas as pd import datetime import logging +import traceback from datetime import timedelta from botocore.config import Config @@ -30,12 +31,101 @@ def _read_nexrad_cache(path): if path not in _nexrad_cache.keys(): raw = pyart.io.read_nexrad_archive(path) _nexrad_cache[path] = raw - # Only maintain 5 files in the cache to save memory - if len(_nexrad_cache.keys()) > 5: + # Only maintain 15 files in the cache to save memory + if len(_nexrad_cache.keys()) > 15: first_key = next(iter(_nexrad_cache)) del _nexrad_cache[first_key] return _nexrad_cache[path] +def _grab_90_degree_rays(radar): + """ Special case for column right over the radar in an RHI""" + # Get the rays within 0.5 degrees of 90 degrees + ray = np.argmin(radar.elevation["data"] - 90.) + moment = {key: [] for key in radar.fields.keys()} + # Determine the center of each gate for the subsetted rays. + rhi_z = radar.range["data"] + + for key in moment: + moment[key] = radar.fields[key]["data"][ray, :].squeeze() + # Add radar elevation to height gates + # to define height as center of each gate above sea level + zgate = rhi_z + radar.altitude["data"][0] + # Determine the time at the center of each ray within the column + # Define the start of the radar volume as a numpy datetime object for xr + base_time = np.datetime64(pyart.util.datetime_from_radar(radar).isoformat(), "ns") + # Convert Py-ART radar object time (time since volume start) to time delta + # Add to base time to have sequential time within the xr Dataset + # for easier future merging/work + combined_time = pd.to_timedelta(radar.time["data"][ray], unit="s") + total_time = base_time + combined_time + + # Create a blank list to hold the xarray DataArrays + ds_container = [] + da_meta = [ + "units", + "standard_name", + "long_name", + "valid_max", + "valid_min", + "coordinates", + ] + # Convert the moment dictionary to xarray DataArray. + # Apply radar object meta data to DataArray attribute + for key in moment: + if key != "height": + da = xr.DataArray( + moment[key], + coords=dict(height=zgate), name=key, dims=["height"] + ) + for tag in da_meta: + if tag in radar.fields[key]: + da.attrs[tag] = radar.fields[key][tag] + # Append to ds container + ds_container.append(da.to_dataset(name=key)) + + # Add additional DataArrays 'base_time' and 'time_offset' + # if not present within the radar object. + da_base = xr.DataArray(base_time, name="base_time") + da_offset = xr.DataArray( + combined_time, coords=dict(height=zgate), name="time_offset", dims=["height"] + ) + ds_container.append(da_base.to_dataset(name="base_time")) + ds_container.append(da_offset.to_dataset(name="time_offset")) + + # Create a xarray DataSet from the DataArrays + column = xr.merge(ds_container) + + # Assign Attributes for the Height and Times + height_des = ( + "Height Above Sea Level [in meters] for the Center of Each" + + " Radar Gate Above the Target Location" + ) + column.height.attrs.update( + long_name="Height of Radar Beam", + units="m", + standard_name="height", + description=height_des, + ) + + column.base_time.attrs.update(long_name="UTC Reference Time", units="seconds") + + time_long = "Time in Seconds Since Volume Start" + time_des = ( + "Time in Seconds Since Volume Start that Cooresponds" + + " to the Center of Each Height Gate" + + " Above the Target Location" + ) + column.time_offset.attrs.update( + long_name=time_long, units="seconds", description=time_des + ) + + # Assign Global Attributes to the DataSet + column.attrs["distance_from_radar"] = "0 km" + column.attrs["azimuth"] = "0 degrees" + column.attrs["latitude_of_location"] = str(radar.latitude["data"][0]) + " degrees" + column.attrs["longitude_of_location"] = str(radar.longitude["data"][0]) + " degrees" + return column + def get_nexrad_column( rad_time, site, @@ -124,11 +214,12 @@ def get_nexrad_column( # NOTE: interpolating throughout Troposphere to match sonde to in the future da = pyart.util.columnsect.column_vertical_profile(radar_obj, lat, lon) + da = da.sortby("height") # check for valid heights valid = np.isfinite(da["height"]) n_valid = int(valid.sum()) if n_valid > 0: - da = da.sel(height=valid).sortby("height").interp(height=height_bins) + da = da.sortby("height").interp(height=height_bins) else: target_height = xr.DataArray(height_bins, dims="height", name="height") da = da.reindex(height=target_height) @@ -201,7 +292,7 @@ def subset_points( lats = list([x[0] for x in input_site_dict.values()]) lons = list([x[1] for x in input_site_dict.values()]) site_alt = list([x[2] for x in input_site_dict.values()]) - + sites = list(input_site_dict.keys()) try: radar = pyart.io.read(nfile, exclude_fields=DEFAULT_DISCARD_VAR[rad_key]) @@ -276,9 +367,10 @@ def subset_points( del z_dict, sonde_dict column_list = [] - for lat, lon in zip(lats, lons): + for lat, lon, site in zip(lats, lons, sites): # Make sure we are interpolating from the radar's location above sea level # NOTE: interpolating throughout Troposphere to match sonde to in the future + if not "rhi" in radar.scan_type: da = pyart.util.columnsect.column_vertical_profile(radar, lat, lon) else: @@ -286,32 +378,43 @@ def subset_points( da = pyart.util.get_field_location(radar, lat, lon) except ValueError: # If the columnsect fails, try adding 180 to the azimuths to account for potential mislabeling of radar location - radar.azimuth["data"] = radar.azimuth["data"] + 180 - # Need to adjust elevation as well to maintain the same relative geometry between radar and column locations - radar.elevation["data"] = 180 - radar.elevation["data"] + if np.all(radar.azimuth["data"] < 180): + radar.azimuth["data"] = radar.azimuth["data"] + 180 + # Need to adjust elevation as well to maintain the same relative geometry between radar and column locations + radar.elevation["data"] = 180 - radar.elevation["data"] try: da = pyart.util.get_field_location(radar, lat, lon) - except ValueError: - # NaNs will be returned if the columnsect fails again after adjusting the azimuths - da = pyart.util.columnsect.column_vertical_profile(radar, lat, lon) + except ValueError as e: + # Grab the vertically pointing ray(s) if the radar site == site + if radar.metadata["facility_id"] == site: + try: + da = _grab_90_degree_rays(radar) + except Exception as e: + logging.warning( + f"Failed to grab 90 degree rays for {site} from {nfile}." + + "NaNs will be returned for this column." + ) + else: + # NaNs will be returned if the columnsect fails again after adjusting the azimuths + da = pyart.util.columnsect.column_vertical_profile(radar, lat, lon) time_offset = da["time_offset"] # check for valid heights + da = da.sortby("height") valid = np.isfinite(da["height"]) n_valid = int(valid.sum()) if n_valid > 0: try: # Drop all NaNs - da = ( - da.sel(height=valid).dropna("height").sortby("height").interp(height=height_bins) + da.dropna("height").sortby("height").interp(height=height_bins) ) except pd.errors.InvalidIndexError: da = da.drop_duplicates("height", keep="first") valid = np.isfinite(da["height"]) da = ( - da.sel(height=valid).dropna("height").sortby("height").interp(height=height_bins) + da.dropna("height").sortby("height").interp(height=height_bins) ) time_offset = time_offset.drop_duplicates("height", keep="first") else: From 972f86a5629c0e3aafc507844671f8ad1bf076bc Mon Sep 17 00:00:00 2001 From: Robert Jackson Date: Mon, 20 Apr 2026 18:52:45 -0500 Subject: [PATCH 13/14] ADD: Vertical beams in RHI given as special case --- src/radclss/core/radclss_core.py | 41 ++-- src/radclss/util/column_utils.py | 70 ++++--- tests/test_radclss.py | 322 ++++++++++++++++--------------- 3 files changed, 233 insertions(+), 200 deletions(-) diff --git a/src/radclss/core/radclss_core.py b/src/radclss/core/radclss_core.py index 9b14104..49ee755 100644 --- a/src/radclss/core/radclss_core.py +++ b/src/radclss/core/radclss_core.py @@ -4,10 +4,7 @@ import act import numpy as np import pandas as pd -import os -from concurrent.futures import ThreadPoolExecutor -from concurrent.futures import as_completed as futures_as_completed from ..util.column_utils import ( subset_points, @@ -126,7 +123,7 @@ def radclss( if verbose: print(f"Using {time_coords} as time basis") print(f"Number of {time_coords} files: {len(volumes[time_coords])}") - + # Call Subset Points columns = {} datastream_names = {} @@ -175,7 +172,7 @@ def radclss( print( f" ERROR processing file (total failures: {failed_count})" ) - datastream_names[k] = columns[k][-1].attrs["datastream"] + if verbose: print( f" Finished {k}: {successful_count} successful, {failed_count} failed" @@ -203,8 +200,8 @@ def radclss( rad_key=k, ) columns[k].append(result) - - except Exception as e: + + except Exception: result = None if verbose: if result is not None: @@ -220,7 +217,6 @@ def radclss( f" Finished {k}: {successful}/{len(columns[k])} successful extractions" ) - # Assemble individual columns into single DataSet # try: # Concatenate all extracted columns across time dimension to form daily timeseries @@ -242,10 +238,10 @@ def radclss( if verbose: print(f" {k}: {len(columns[k])} columns") print(f" Time range: {min_times[k]} to {max_times[k]}") - + min_time = min(np.array([x for x in min_times.values()])) max_time = max(np.array([x for x in max_times.values()])) - if not "radar" in time_coords and not _is_valid_offset(time_coords): + if "radar" not in time_coords and not _is_valid_offset(time_coords): if verbose: print(f"Loading timestamps from {time_coords}...") time_ds = xr.open_dataset(volumes[time_coords][0]) @@ -277,13 +273,12 @@ def radclss( ] ) elif _is_valid_offset(time_coords): - time_list = pd.date_range(min_time, max_time, freq=time_coords).strftime("%Y-%m-%dT%H:%M:%S") + time_list = pd.date_range(min_time, max_time, freq=time_coords).strftime( + "%Y-%m-%dT%H:%M:%S" + ) else: time_list = sorted( - [ - str(x.dt.strftime("%Y-%m-%dT%H:%M:%S").values) - for x in ds_times - ] + [str(x.dt.strftime("%Y-%m-%dT%H:%M:%S").values) for x in ds_times] ) if verbose: print(f" Number of NEXRAD time steps to fetch: {len(time_list)}") @@ -307,7 +302,7 @@ def _get_nexrad_wrapper(time_str): input_site_dict, nexrad_radar=nexrad_site, ) - + results = current_client.map(_get_nexrad_wrapper, time_list) successful_count = 0 @@ -335,6 +330,7 @@ def _get_nexrad_wrapper(time_str): else: if verbose: print(f" Fetching {len(time_list)} NEXRAD columns via thread pool...") + def _get_nexrad_wrapper(time_str): return get_nexrad_column( time_str, @@ -342,6 +338,7 @@ def _get_nexrad_wrapper(time_str): input_site_dict, nexrad_radar=nexrad_site, ) + successful_count = 0 failed_count = 0 for t in time_list: @@ -370,7 +367,6 @@ def _get_nexrad_wrapper(time_str): else: nexrad_columns = None - if verbose: print("\n" + "=" * 80) print("STEP 4: Concatenating and processing time coordinates") @@ -684,6 +680,10 @@ def _get_nexrad_wrapper(time_str): _instrument_tasks = [] vol_keys = list(volumes.keys()) for k in vol_keys: + if volumes[k] is None: + if verbose: + print(f"No files provided for instrument/site: {k}") + continue if len(volumes[k]) == 0: if verbose: print(f"No files found for instrument/site: {k}") @@ -811,8 +811,7 @@ def _get_nexrad_wrapper(time_str): _, matched = _prepare_match(**kwargs) if matched is not None: ds = _apply_match(ds, site, matched) - datastream_names[k] = matched.attrs["datastream"] - + else: # There is no column extraction raise RuntimeError(": RadCLss FAILURE (All Columns Failed to Extract): ") @@ -826,7 +825,7 @@ def _get_nexrad_wrapper(time_str): del ds["base_time"].attrs["units"] del ds["time_offset"].attrs["units"] del ds["time"].attrs["units"] - + if verbose: print(" Adding input datastream names to dataset attributes...") for k, v in datastream_names.items(): @@ -850,9 +849,9 @@ def _get_nexrad_wrapper(time_str): return ds + def _is_valid_offset(s: str) -> bool: try: return pd.tseries.frequencies.to_offset(s) is not None except ValueError: return False - diff --git a/src/radclss/util/column_utils.py b/src/radclss/util/column_utils.py index fff41f4..b8488e6 100644 --- a/src/radclss/util/column_utils.py +++ b/src/radclss/util/column_utils.py @@ -6,7 +6,6 @@ import pandas as pd import datetime import logging -import traceback from datetime import timedelta from botocore.config import Config @@ -27,6 +26,7 @@ def _read_sonde_cached(path, exclude): raw.close() return _sonde_cache[path] + def _read_nexrad_cache(path): if path not in _nexrad_cache.keys(): raw = pyart.io.read_nexrad_archive(path) @@ -37,14 +37,15 @@ def _read_nexrad_cache(path): del _nexrad_cache[first_key] return _nexrad_cache[path] + def _grab_90_degree_rays(radar): - """ Special case for column right over the radar in an RHI""" + """Special case for column right over the radar in an RHI""" # Get the rays within 0.5 degrees of 90 degrees - ray = np.argmin(radar.elevation["data"] - 90.) + ray = np.argmin(radar.elevation["data"] - 90.0) moment = {key: [] for key in radar.fields.keys()} # Determine the center of each gate for the subsetted rays. rhi_z = radar.range["data"] - + for key in moment: moment[key] = radar.fields[key]["data"][ray, :].squeeze() # Add radar elevation to height gates @@ -57,7 +58,6 @@ def _grab_90_degree_rays(radar): # Add to base time to have sequential time within the xr Dataset # for easier future merging/work combined_time = pd.to_timedelta(radar.time["data"][ray], unit="s") - total_time = base_time + combined_time # Create a blank list to hold the xarray DataArrays ds_container = [] @@ -74,8 +74,7 @@ def _grab_90_degree_rays(radar): for key in moment: if key != "height": da = xr.DataArray( - moment[key], - coords=dict(height=zgate), name=key, dims=["height"] + moment[key], coords=dict(height=zgate), name=key, dims=["height"] ) for tag in da_meta: if tag in radar.fields[key]: @@ -126,6 +125,7 @@ def _grab_90_degree_rays(radar): column.attrs["longitude_of_location"] = str(radar.longitude["data"][0]) + " degrees" return column + def get_nexrad_column( rad_time, site, @@ -292,7 +292,7 @@ def subset_points( lats = list([x[0] for x in input_site_dict.values()]) lons = list([x[1] for x in input_site_dict.values()]) site_alt = list([x[2] for x in input_site_dict.values()]) - + sites = list(input_site_dict.keys()) try: radar = pyart.io.read(nfile, exclude_fields=DEFAULT_DISCARD_VAR[rad_key]) @@ -307,7 +307,7 @@ def subset_points( # Check for RHI and reduce to first sweep if > 1 sweep if "rhi" in radar.scan_type: radar = radar.extract_sweeps([0]) - + # Check for single sweep scans if np.ma.is_masked(radar.sweep_start_ray_index["data"][1:]): radar.sweep_start_ray_index["data"] = np.ma.array([0]) @@ -336,10 +336,10 @@ def subset_points( for xfile in sonde ] # difference in time between radar file and each sonde file - start_diff = [radar_start - sonde for sonde in sonde_start] + start_diff = np.array([radar_start - sonde for sonde in sonde_start]) # merge the sonde file into the radar object (cached across radar files) - sonde_path = sonde[start_diff.index(min(abs(start_diff)))] + sonde_path = sonde[np.argmin(np.abs(start_diff))] ds_sonde = _read_sonde_cached(sonde_path, exclude_sonde) # create list of variables within sonde dataset to add to the radar file @@ -361,7 +361,9 @@ def subset_points( radar.fields["sonde_" + var]["standard_name"] = sonde_dict[ "standard_name" ] - radar.fields["sonde_" + var]["input_datastream"] = ds_sonde.datastream + radar.fields["sonde_" + var][ + "input_datastream" + ] = ds_sonde.datastream del radar_start, sonde_start, ds_sonde del z_dict, sonde_dict @@ -370,8 +372,8 @@ def subset_points( for lat, lon, site in zip(lats, lons, sites): # Make sure we are interpolating from the radar's location above sea level # NOTE: interpolating throughout Troposphere to match sonde to in the future - - if not "rhi" in radar.scan_type: + + if "rhi" not in radar.scan_type: da = pyart.util.columnsect.column_vertical_profile(radar, lat, lon) else: try: @@ -384,20 +386,22 @@ def subset_points( radar.elevation["data"] = 180 - radar.elevation["data"] try: da = pyart.util.get_field_location(radar, lat, lon) - except ValueError as e: + except ValueError: # Grab the vertically pointing ray(s) if the radar site == site if radar.metadata["facility_id"] == site: try: da = _grab_90_degree_rays(radar) - except Exception as e: + except Exception: logging.warning( f"Failed to grab 90 degree rays for {site} from {nfile}." + "NaNs will be returned for this column." ) else: # NaNs will be returned if the columnsect fails again after adjusting the azimuths - da = pyart.util.columnsect.column_vertical_profile(radar, lat, lon) - + da = pyart.util.columnsect.column_vertical_profile( + radar, lat, lon + ) + time_offset = da["time_offset"] # check for valid heights da = da.sortby("height") @@ -407,16 +411,22 @@ def subset_points( try: # Drop all NaNs da = ( - da.dropna("height").sortby("height").interp(height=height_bins) + da.dropna("height") + .sortby("height") + .interp(height=height_bins) ) except pd.errors.InvalidIndexError: da = da.drop_duplicates("height", keep="first") - + valid = np.isfinite(da["height"]) da = ( - da.dropna("height").sortby("height").interp(height=height_bins) + da.dropna("height") + .sortby("height") + .interp(height=height_bins) + ) + time_offset = time_offset.drop_duplicates( + "height", keep="first" ) - time_offset = time_offset.drop_duplicates("height", keep="first") else: target_height = xr.DataArray( height_bins, dims="height", name="height" @@ -428,7 +438,9 @@ def subset_points( # Add the latitude and longitude of the extracted column da["lat"], da["lon"] = lat, lon # Convert timeoffsets to timedelta object and precision on datetime64 - da["time_offset"].data = da["time_offset"].values.astype("timedelta64[s]") + da["time_offset"].data = da["time_offset"].values.astype( + "timedelta64[s]" + ) da.base_time.data = da.base_time.values.astype("datetime64[s]") # Time is based off the start of the radar volume da["gate_time"] = ( @@ -439,7 +451,7 @@ def subset_points( # Concatenate the extracted radar columns for this scan across all sites ds = xr.concat([data for data in column_list if data], dim="station") ds = _add_station_vars(ds, sites, site_alt) - ds.attrs["input_datastream"] = radar.metadata["datastream"] + del column_list, da finally: del radar @@ -467,12 +479,17 @@ def _prepare_match( grd_ds = ground else: _grd_raw = act.io.read_arm_netcdf( - ground, cleanup_qc=True, drop_variables=discard, parallel=False, + ground, + cleanup_qc=True, + drop_variables=discard, + parallel=False, ) grd_ds = _grd_raw if prefix: if prefix == "wxt_": - rename_dict = {v: f"{prefix}{v}" for v in grd_ds.data_vars if "wxt_" not in v} + rename_dict = { + v: f"{prefix}{v}" for v in grd_ds.data_vars if "wxt_" not in v + } else: rename_dict = {v: f"{prefix}{v}" for v in grd_ds.data_vars} @@ -527,7 +544,6 @@ def _prepare_match( for var in matched.data_vars: matched[var].attrs.update(source=matched.datastream) - matched[var].attrs["input_datastream"] = grd_ds.attrs["datastream"] grd_ds.close() _grd_raw.close() return site, matched diff --git a/tests/test_radclss.py b/tests/test_radclss.py index f2c3bea..aee3a0b 100644 --- a/tests/test_radclss.py +++ b/tests/test_radclss.py @@ -18,23 +18,24 @@ def test_radclss_serial(): if not username or not token: return # Skip test if credentials are not set - act.discovery.download_arm_data( - username, - token, - "bnfcsapr2cfrS3.a1", - "2025-06-19T12:00:00", - "2025-06-19T12:30:00", - output=test_data_path, - ) - - act.discovery.download_arm_data( - username, - token, - "bnfsondewnpnM1.b1", - "2025-06-18T00:00:00", - "2025-06-20T00:00:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfcsapr2cfrS3.a1", + "2025-06-19T12:00:00", + "2025-06-19T12:30:00", + output=test_data_path, + ) + if glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfinterpolatedsondeM1.c1", + "2025-06-18T00:00:00", + "2025-06-20T00:00:00", + output=test_data_path, + ) for files in arm_test_data.DATASETS.registry.keys(): if "bnf" in files: if not os.path.exists(os.path.join(test_data_path, files)): @@ -43,7 +44,7 @@ def test_radclss_serial(): rad_path = os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc") radar_files = sorted(glob.glob(rad_path)) sonde_files = sorted( - glob.glob(os.path.join(test_data_path, "*bnfsondewnpnM1.b1*cdf")) + glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*")) ) vd_M1_files = glob.glob(os.path.join(test_data_path, "*bnfvdisquantsM1.c1*nc")) @@ -117,20 +118,24 @@ def test_radclss_serial(): ) assert not (my_columns["sonde_v_wind"].sel(station=site) == missing_value).all() missing_value = ( - my_columns["sonde_tdry"].sel(station=site).attrs.get("missing_value", None) + my_columns["sonde_temp"].sel(station=site).attrs.get("missing_value", None) ) - assert not (my_columns["sonde_tdry"].sel(station=site) == missing_value).all() + assert not (my_columns["sonde_temp"].sel(station=site) == missing_value).all() missing_value = ( my_columns["sonde_rh"].sel(station=site).attrs.get("missing_value", None) ) assert not (my_columns["sonde_rh"].sel(station=site) == missing_value).all() missing_value = ( - my_columns["sonde_pres"].sel(station=site).attrs.get("missing_value", None) + my_columns["sonde_bar_pres"] + .sel(station=site) + .attrs.get("missing_value", None) ) - assert not (my_columns["sonde_pres"].sel(station=site) == missing_value).all() + assert not ( + my_columns["sonde_bar_pres"].sel(station=site) == missing_value + ).all() # Met data check - for site in ["M1", "S20", "S30", "S40", "S13"]: + for site in ["M1", "S20", "S30", "S40"]: missing_value = ( my_columns["temp_mean"].sel(station=site).attrs.get("_FillValue", None) ) @@ -190,24 +195,25 @@ def test_radclss_parallel(): token = os.getenv("ARM_PASSWORD") if not username or not token: return # Skip test if credentials are not set - - act.discovery.download_arm_data( - username, - token, - "bnfcsapr2cfrS3.a1", - "2025-06-19T12:00:00", - "2025-06-19T12:30:00", - output=test_data_path, - ) - - act.discovery.download_arm_data( - username, - token, - "bnfsondewnpnM1.b1", - "2025-06-18T00:00:00", - "2025-06-20T00:00:00", - output=test_data_path, - ) + # If data is not cached already, download the necessary data for testing + if glob.glob(os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfcsapr2cfrS3.a1", + "2025-06-19T12:00:00", + "2025-06-19T12:30:00", + output=test_data_path, + ) + if glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfinterpolatedsondeM1.c1", + "2025-06-18T00:00:00", + "2025-06-20T00:00:00", + output=test_data_path, + ) for files in arm_test_data.DATASETS.registry.keys(): if "bnf" in files: if not os.path.exists(os.path.join(test_data_path, files)): @@ -216,7 +222,7 @@ def test_radclss_parallel(): rad_path = os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc") radar_files = sorted(glob.glob(rad_path)) sonde_files = sorted( - glob.glob(os.path.join(test_data_path, "*bnfsondewnpnM1.b1*cdf")) + glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*")) ) vd_M1_files = glob.glob(os.path.join(test_data_path, "*bnfvdisquantsM1.c1*nc")) @@ -290,20 +296,24 @@ def test_radclss_parallel(): ) assert not (my_columns["sonde_v_wind"].sel(station=site) == missing_value).all() missing_value = ( - my_columns["sonde_tdry"].sel(station=site).attrs.get("missing_value", None) + my_columns["sonde_temp"].sel(station=site).attrs.get("missing_value", None) ) - assert not (my_columns["sonde_tdry"].sel(station=site) == missing_value).all() + assert not (my_columns["sonde_temp"].sel(station=site) == missing_value).all() missing_value = ( my_columns["sonde_rh"].sel(station=site).attrs.get("missing_value", None) ) assert not (my_columns["sonde_rh"].sel(station=site) == missing_value).all() missing_value = ( - my_columns["sonde_pres"].sel(station=site).attrs.get("missing_value", None) + my_columns["sonde_bar_pres"] + .sel(station=site) + .attrs.get("missing_value", None) ) - assert not (my_columns["sonde_pres"].sel(station=site) == missing_value).all() + assert not ( + my_columns["sonde_bar_pres"].sel(station=site) == missing_value + ).all() # Met data check - for site in ["M1", "S20", "S30", "S40", "S13"]: + for site in ["M1", "S20", "S30", "S40"]: missing_value = ( my_columns["temp_mean"].sel(station=site).attrs.get("_FillValue", None) ) @@ -363,23 +373,24 @@ def test_subset_points(): if not username or not token: return # Skip test if credentials are not set - act.discovery.download_arm_data( - username, - token, - "bnfcsapr2cfrS3.a1", - "2025-06-19T12:00:00", - "2025-06-19T12:30:00", - output=test_data_path, - ) - - act.discovery.download_arm_data( - username, - token, - "bnfsondewnpnM1.b1", - "2025-06-18T00:00:00", - "2025-06-20T00:00:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfcsapr2cfrS3.a1", + "2025-06-19T12:00:00", + "2025-06-19T12:30:00", + output=test_data_path, + ) + if glob.glob(os.path.join(test_data_path, "*bnfsondewnpnM1.b1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfsondewnpnM1.b1", + "2025-06-18T00:00:00", + "2025-06-20T00:00:00", + output=test_data_path, + ) rad_path = os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc") radar_files = sorted(glob.glob(rad_path)) @@ -394,13 +405,11 @@ def test_subset_points(): assert np.array_equal(subset_ds["height"].values, np.arange(500, 8500, 250)) assert "sonde_u_wind" not in subset_ds.data_vars assert "sonde_v_wind" not in subset_ds.data_vars - assert "sonde_tdry" not in subset_ds.data_vars + assert "sonde_temp" not in subset_ds.data_vars assert "sonde_rh" not in subset_ds.data_vars # Test with rawinsonde input instead of sonde=False - sonde_files = sorted( - glob.glob(os.path.join(test_data_path, "*bnfsondewnpnM1.b1*cdf")) - ) + sonde_files = sorted(glob.glob(os.path.join(test_data_path, "*bnfsondewnpnM1.b1*"))) subset_ds = radclss.util.subset_points( radar_files[0], input_site_dict, sonde=sonde_files @@ -414,7 +423,6 @@ def test_subset_points(): assert "sonde_rh" in subset_ds.data_vars assert "sonde_u_wind" in subset_ds.data_vars assert "sonde_v_wind" in subset_ds.data_vars - assert "sonde_tdry" in subset_ds.data_vars assert "sonde_rh" in subset_ds.data_vars @@ -431,34 +439,37 @@ def test_radclss_with_kasacr(): return # Skip test if credentials are not set # Download CSAPR2 data - act.discovery.download_arm_data( - username, - token, - "bnfcsapr2cfrS3.a1", - "2025-06-01T12:00:00", - "2025-06-01T12:30:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfcsapr2cfrS3.a1", + "2025-06-01T12:00:00", + "2025-06-01T12:30:00", + output=test_data_path, + ) # Download KASACR data - act.discovery.download_arm_data( - username, - token, - "bnfkasacrcfrS4.a1", - "2025-06-01T12:00:00", - "2025-06-01T12:30:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfkasacrcfrS4.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfkasacrcfrS4.a1", + "2025-06-01T12:00:00", + "2025-06-01T12:30:00", + output=test_data_path, + ) # Download sonde data - act.discovery.download_arm_data( - username, - token, - "bnfsondewnpnM1.b1", - "2025-06-01T00:00:00", - "2025-06-02T00:00:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfinterpolatedsondeM1.c1", + "2025-06-01T00:00:00", + "2025-06-02T00:00:00", + output=test_data_path, + ) # Fetch any other test data for files in arm_test_data.DATASETS.registry.keys(): @@ -474,7 +485,7 @@ def test_radclss_with_kasacr(): glob.glob(os.path.join(test_data_path, "*bnfkasacrcfrS4.a1*.nc")) ) sonde_files = sorted( - glob.glob(os.path.join(test_data_path, "*bnfsondewnpnM1.b1*cdf")) + glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*")) ) # Gather other instrument files @@ -573,34 +584,37 @@ def test_radclss_with_kazr(): return # Skip test if credentials are not set # Download CSAPR2 data - act.discovery.download_arm_data( - username, - token, - "bnfcsapr2cfrS3.a1", - "2025-06-01T12:00:00", - "2025-06-01T12:30:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfcsapr2cfrS3.a1", + "2025-06-01T12:00:00", + "2025-06-01T12:30:00", + output=test_data_path, + ) # Download KASACR data - act.discovery.download_arm_data( - username, - token, - "bnfkazr2cfrprM1.a1", - "2025-06-01T12:00:00", - "2025-06-01T12:30:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfkazr2cfrprM1.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfkazr2cfrprM1.a1", + "2025-06-01T12:00:00", + "2025-06-01T12:30:00", + output=test_data_path, + ) # Download sonde data - act.discovery.download_arm_data( - username, - token, - "bnfsondewnpnM1.b1", - "2025-06-01T00:00:00", - "2025-06-02T00:00:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfinterpolatedsondeM1.c1", + "2025-06-01T00:00:00", + "2025-06-02T00:00:00", + output=test_data_path, + ) # Fetch any other test data for files in arm_test_data.DATASETS.registry.keys(): @@ -616,7 +630,7 @@ def test_radclss_with_kazr(): glob.glob(os.path.join(test_data_path, "*bnfkazr2cfrprM1.a1*.nc")) ) sonde_files = sorted( - glob.glob(os.path.join(test_data_path, "*bnfsondewnpnM1.b1*cdf")) + glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*")) ) # Gather other instrument files @@ -715,14 +729,15 @@ def test_radclss_parallel_with_nexrad(): return # Skip test if credentials are not set # Download CSAPR2 data - act.discovery.download_arm_data( - username, - token, - "bnfcsapr2cfrS3.a1", - "2025-06-19T12:00:00", - "2025-06-19T12:30:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfcsapr2cfrS3.a1", + "2025-06-19T12:00:00", + "2025-06-19T12:30:00", + output=test_data_path, + ) # Gather all the radar files csapr2_files = sorted( @@ -807,34 +822,37 @@ def test_radclss_parallel_with_kasacr(): return # Skip test if credentials are not set # Download CSAPR2 data - act.discovery.download_arm_data( - username, - token, - "bnfcsapr2cfrS3.a1", - "2025-06-19T12:00:00", - "2025-06-19T12:30:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfcsapr2cfrS3.a1", + "2025-06-19T12:00:00", + "2025-06-19T12:30:00", + output=test_data_path, + ) # Download KASACR data - act.discovery.download_arm_data( - username, - token, - "bnfkasacrcfrS4.a1", - "2025-06-19T12:00:00", - "2025-06-19T12:30:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfkasacrcfrS4.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfkasacrcfrS4.a1", + "2025-06-19T12:00:00", + "2025-06-19T12:30:00", + output=test_data_path, + ) # Download sonde data - act.discovery.download_arm_data( - username, - token, - "bnfsondewnpnM1.b1", - "2025-06-18T00:00:00", - "2025-06-20T00:00:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfinterpolatedsondeM1.c1", + "2025-06-18T00:00:00", + "2025-06-20T00:00:00", + output=test_data_path, + ) # Fetch any other test data for files in arm_test_data.DATASETS.registry.keys(): @@ -850,7 +868,7 @@ def test_radclss_parallel_with_kasacr(): glob.glob(os.path.join(test_data_path, "*bnfkasacrcfrS4.a1*.nc")) ) sonde_files = sorted( - glob.glob(os.path.join(test_data_path, "*bnfsondewnpnM1.b1*cdf")) + glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*")) ) # Gather other instrument files From 5f5ac68796ad96b86ccfc832e531de0d94b861f8 Mon Sep 17 00:00:00 2001 From: Robert Jackson Date: Mon, 20 Apr 2026 18:53:23 -0500 Subject: [PATCH 14/14] VER: 2026.4.20 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index e261745..bbc16c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "radclss" -version = "2026.4.7" +version = "2026.4.20" description = "Extracted Radar Columns and In Situ Sensors" readme = "README.md" requires-python = ">=3.10"