diff --git a/.github/workflows/run-test-suite.yml b/.github/workflows/run-test-suite.yml index 5dd4fef..bc4f32d 100644 --- a/.github/workflows/run-test-suite.yml +++ b/.github/workflows/run-test-suite.yml @@ -23,7 +23,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"] + python-version: ["3.10", "3.11", "3.12", "3.13"] os: [macOS, ubuntu] include: - os: macos-latest diff --git a/pyproject.toml b/pyproject.toml index ffd1457..bbc16c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "radclss" -version = "2026.3.6" +version = "2026.4.20" description = "Extracted Radar Columns and In Situ Sensors" readme = "README.md" requires-python = ">=3.10" diff --git a/src/radclss/config/default_config.py b/src/radclss/config/default_config.py index d789005..9a7603a 100644 --- a/src/radclss/config/default_config.py +++ b/src/radclss/config/default_config.py @@ -52,15 +52,12 @@ "height_over_iso0", ], "radar_kasacr": [ - "signal_to_noise_ratio_copolar_h", "signal_to_noise_ratio_copolar_v", ], "radar_xsacr": [ - "signal_to_noise_ratio_copolar_h", "signal_to_noise_ratio_copolar_v", ], "kazr2": [ - "signal_to_noise_ratio_copolar_h", "signal_to_noise_ratio_crosspolar_v", ], "radar_csapr2": [ @@ -76,7 +73,6 @@ "uncorrected_reflectivity_v", "uncorrected_spectral_width_h", "uncorrected_spectral_width_v", - "signal_to_noise_ratio_copolar_h", "signal_to_noise_ratio_copolar_v", "normalized_coherent_power", "normalized_coherent_power_v", @@ -165,33 +161,12 @@ "accum_total_nrt", ], "ldquants": [ - "specific_differential_attenuation_xband20c", - "specific_differential_attenuation_kaband20c", - "specific_differential_attenuation_sband20c", "bringi_conv_stra_flag", "exppsd_slope", "norm_num_concen", "num_concen", "gammapsd_shape", "gammapsd_slope", - "mean_doppler_vel_wband20c", - "mean_doppler_vel_kaband20c", - "mean_doppler_vel_xband20c", - "mean_doppler_vel_sband20c", - "specific_attenuation_kaband20c", - "specific_attenuation_xband20c", - "specific_attenuation_sband20c", - "specific_differential_phase_kaband20c", - "specific_differential_phase_xband20c", - "specific_differential_phase_sband20c", - "differential_reflectivity_kaband20c", - "differential_reflectivity_xband20c", - "differential_reflectivity_sband20c", - "reflectivity_factor_wband20c", - "reflectivity_factor_kaband20c", - "reflectivity_factor_xband20c", - "reflectivity_factor_sband20c", - "bringi_conv_stra_flag", "time_offset", "base_time", "lat", @@ -199,32 +174,12 @@ "alt", ], "vdisquants": [ - "specific_differential_attenuation_xband20c", - "specific_differential_attenuation_kaband20c", - "specific_differential_attenuation_sband20c", "bringi_conv_stra_flag", "exppsd_slope", "norm_num_concen", "num_concen", "gammapsd_shape", "gammapsd_slope", - "mean_doppler_vel_wband20c", - "mean_doppler_vel_kaband20c", - "mean_doppler_vel_xband20c", - "mean_doppler_vel_sband20c", - "specific_attenuation_kaband20c", - "specific_attenuation_xband20c", - "specific_attenuation_sband20c", - "specific_differential_phase_kaband20c", - "specific_differential_phase_xband20c", - "specific_differential_phase_sband20c", - "differential_reflectivity_kaband20c", - "differential_reflectivity_xband20c", - "differential_reflectivity_sband20c", - "reflectivity_factor_wband20c", - "reflectivity_factor_kaband20c", - "reflectivity_factor_xband20c", - "reflectivity_factor_sband20c", "bringi_conv_stra_flag", "time_offset", "base_time", @@ -237,19 +192,11 @@ "time_offset", "time_bounds", "qc_temp_mean", - "temp_std", - "rh_mean", "qc_rh_mean", - "rh_std", - "atmos_pressure", "qc_atmos_pressure", - "wspd_arith_mean", "qc_wspd_arith_mean", - "wspd_vec_mean", "qc_wspd_vec_mean", - "wdir_vec_mean", "qc_wdir_vec_mean", - "wdir_vec_std", "qc_wxt_precip_rate_mean", "qc_wxt_cumul_precip", "logger_volt", diff --git a/src/radclss/config/output_config.py b/src/radclss/config/output_config.py index 427fbda..ec1a9fd 100644 --- a/src/radclss/config/output_config.py +++ b/src/radclss/config/output_config.py @@ -1,7 +1,7 @@ OUTPUT_SITE = "BNF" OUTPUT_FACILITY = "M1" OUTPUT_PLATFORM = "radclss" -OUTPUT_LEVEL = "c2" +OUTPUT_LEVEL = "c0" OUTPUT_GATE_TIME_ATTRS = { "long_name": ( "Time in Seconds that Cooresponds to the Start" diff --git a/src/radclss/core/radclss_core.py b/src/radclss/core/radclss_core.py index 635c9c1..49ee755 100644 --- a/src/radclss/core/radclss_core.py +++ b/src/radclss/core/radclss_core.py @@ -5,7 +5,13 @@ import numpy as np import pandas as pd -from ..util.column_utils import subset_points, match_datasets_act, get_nexrad_column + +from ..util.column_utils import ( + subset_points, + get_nexrad_column, + _prepare_match, + _apply_match, +) from ..config.default_config import DEFAULT_DISCARD_VAR from ..config.output_config import get_output_config from dask.distributed import Client, as_completed @@ -117,12 +123,10 @@ def radclss( if verbose: print(f"Using {time_coords} as time basis") print(f"Number of {time_coords} files: {len(volumes[time_coords])}") - else: - raise NotImplementedError( - "Currently, only radar-based time coordinates are supported. Please specify a radar key from the volumes dictionary as the time_coords argument." - ) + # Call Subset Points columns = {} + datastream_names = {} if verbose: print("\n" + "=" * 80) print("STEP 1: Extracting radar columns") @@ -146,7 +150,7 @@ def radclss( results = current_client.map( subset_points, volumes[k], - sonde=volumes["sonde"], + sonde=None, input_site_dict=input_site_dict, height_bins=height_bins, rad_key=k, @@ -187,14 +191,18 @@ def radclss( print( f" [{file_count}/{len(volumes[k])}] Processing: {rad.split('/')[-1]}" ) - result = subset_points( - rad, - sonde=volumes["sonde"], - input_site_dict=input_site_dict, - height_bins=height_bins, - rad_key=k, - ) - columns[k].append(result) + try: + result = subset_points( + rad, + sonde=None, + input_site_dict=input_site_dict, + height_bins=height_bins, + rad_key=k, + ) + columns[k].append(result) + + except Exception: + result = None if verbose: if result is not None: print( @@ -233,6 +241,17 @@ def radclss( min_time = min(np.array([x for x in min_times.values()])) max_time = max(np.array([x for x in max_times.values()])) + if "radar" not in time_coords and not _is_valid_offset(time_coords): + if verbose: + print(f"Loading timestamps from {time_coords}...") + time_ds = xr.open_dataset(volumes[time_coords][0]) + ds_times = time_ds["time"].load().copy() + ds_times = ds_times.sel(time=slice(min_time, max_time)) + min_time = min(ds_times.values) + max_time = max(ds_times.values) + time_ds.close() + if verbose: + print(f" Time range: {min_time} to {max_time}") if verbose: print(f"\nOverall time range: {min_time} to {max_time}") @@ -253,7 +272,14 @@ def radclss( for x in columns[time_coords] ] ) - + elif _is_valid_offset(time_coords): + time_list = pd.date_range(min_time, max_time, freq=time_coords).strftime( + "%Y-%m-%dT%H:%M:%S" + ) + else: + time_list = sorted( + [str(x.dt.strftime("%Y-%m-%dT%H:%M:%S").values) for x in ds_times] + ) if verbose: print(f" Number of NEXRAD time steps to fetch: {len(time_list)}") print(f" Time list: {time_list[0]} to {time_list[-1]}") @@ -303,18 +329,34 @@ def _get_nexrad_wrapper(time_str): ) else: if verbose: - print(" Processing NEXRAD columns in serial mode...") - for i, time_str in enumerate(time_list, 1): - if verbose and i % 5 == 0: - print(f" [{i}/{len(time_list)}] Fetching NEXRAD for {time_str}") - nexrad_columns.append( - get_nexrad_column( - time_str, - output_config["site"], - input_site_dict, - ) + print(f" Fetching {len(time_list)} NEXRAD columns via thread pool...") + + def _get_nexrad_wrapper(time_str): + return get_nexrad_column( + time_str, + output_config["site"], + input_site_dict, + nexrad_radar=nexrad_site, ) + successful_count = 0 + failed_count = 0 + for t in time_list: + try: + nexrad_columns.append(_get_nexrad_wrapper(t)) + successful_count += 1 + if verbose and successful_count % 5 == 0: + print( + f" Completed {successful_count}/{len(time_list)} NEXRAD columns..." + ) + except Exception as error: + failed_count += 1 + if verbose: + print( + f" ERROR fetching NEXRAD data (total failures: {failed_count})" + ) + logging.exception(error) + if verbose: valid_nexrad = sum(1 for x in nexrad_columns if x is not None) print(f" Concatenating {valid_nexrad} valid NEXRAD columns...") @@ -333,12 +375,20 @@ def _get_nexrad_wrapper(time_str): output_platform = output_config["platform"] output_level = output_config["level"] - # Convert time variables to something xarray understands + # Convert time variables to something xarray understands. + # Concat in chunks of 50 so only a fraction of per-file datasets are held in + # memory at once before being folded into an intermediate result. + _CONCAT_CHUNK = 50 ds_concat = {} for k in columns.keys(): if verbose: print(f" Processing {k}...") - ds_concat[k] = xr.concat([data for data in columns[k] if data], dim="time") + valid = [data for data in columns[k] if data is not None] + chunks = [ + xr.concat(valid[i : i + _CONCAT_CHUNK], dim="time") + for i in range(0, len(valid), _CONCAT_CHUNK) + ] + ds_concat[k] = xr.concat(chunks, dim="time") if len(chunks) > 1 else chunks[0] if verbose: print( f" Concatenated dimensions: time={ds_concat[k].dims['time']}, station={ds_concat[k].dims['station']}, height={ds_concat[k].dims['height']}" @@ -401,7 +451,7 @@ def _get_nexrad_wrapper(time_str): ds_concat[k] = ds_concat[k].reindex( time=nexrad_columns["time"], method="nearest" ) - else: + elif _is_valid_offset(time_coords): if verbose: print(f" Resampling to {time_coords} intervals") for k in ds_concat.keys(): @@ -410,13 +460,18 @@ def _get_nexrad_wrapper(time_str): nexrad_columns = nexrad_columns.resample(time=time_coords) # Then, reindex to the largest of the time arrays - new_coordinates = pd.date_range(min_time, max_time, time_coords) + new_coordinates = pd.date_range(min_time, max_time, freq=time_coords) if verbose: print(f" Creating new time grid: {len(new_coordinates)} time steps") for k in ds_concat.keys(): ds_concat[k] = ds_concat[k].reindex(time=new_coordinates) if nexrad: nexrad_columns = nexrad_columns.reindex(time=new_coordinates) + else: + for k in ds_concat.keys(): + ds_concat[k] = ds_concat[k].reindex(time=ds_times, method="nearest") + if nexrad: + nexrad_columns = nexrad_columns.reindex(time=ds_times, method="nearest") # Rename all variables according to their radar name if verbose: @@ -497,7 +552,7 @@ def _get_nexrad_wrapper(time_str): if verbose: print(" Merging NEXRAD data into combined dataset...") ds_concat = xr.merge([ds_concat, nexrad_columns]) - + if verbose: print(f" Total variables in merged dataset: {len(ds_concat.data_vars)}") print("\n" + "=" * 80) @@ -601,20 +656,12 @@ def _get_nexrad_wrapper(time_str): if verbose: print("\n Freeing memory: deleting intermediate datasets...") ds_concat.close() - if serial is not True: - if current_client is None: - try: - current_client = Client.current() - except ValueError: - raise RuntimeError( - "No Dask client found. Please start a Dask client before running in parallel mode." - ) - current_client.restart() del ds_concat - - - # Free up Memory + del columns + if verbose and serial is False: + print("\n Restarting dask client.") + current_client.restart() # If successful column extraction, apply in-situ if ds: @@ -627,10 +674,15 @@ def _get_nexrad_wrapper(time_str): print("=" * 80) print(f" Radar processing completed at: {time.strftime('%H:%M:%S')}") - # Find all of the met stations and match to columns + # Build a task list: (label, site, _prepare_match kwargs) for every instrument. + # All entries are independent so we can load+resample them concurrently, then + # apply results sequentially into ds. + _instrument_tasks = [] vol_keys = list(volumes.keys()) for k in vol_keys: - if k == "sonde": + if volumes[k] is None: + if verbose: + print(f"No files provided for instrument/site: {k}") continue if len(volumes[k]) == 0: if verbose: @@ -641,76 +693,125 @@ def _get_nexrad_wrapper(time_str): else: instrument = k site = base_station + site = site.upper() if instrument == "kazr2": - if verbose: - print(f"Matching KAZR2 data for site: {site}") - ds = match_datasets_act( - ds, - volumes[k], - site.upper(), - discard=discard_var["kazr2"], - resample="mean", - prefix="kazr2_", - verbose=verbose, + _instrument_tasks.append( + ( + k, + site, + dict( + ground=volumes[k], + site=site, + discard=discard_var["kazr2"], + column_time=ds.time, + column_height=ds["height"], + resample="mean", + prefix="kazr2_", + ), + ) ) - if instrument == "met": - if verbose: - print(f"Matching MET data for site: {site}") - ds = match_datasets_act( - ds, - volumes[k][0], - site.upper(), - resample="mean", - discard=discard_var["met"], - verbose=verbose, + elif instrument == "met": + _instrument_tasks.append( + ( + k, + site, + dict( + ground=volumes[k][0], + site=site, + discard=discard_var["met"], + column_time=ds.time, + column_height=ds["height"], + resample="mean", + ), + ) ) - - if instrument == "pluvio": - # Weighing Bucket Rain Gauge - ds = match_datasets_act( - ds, - volumes[k], - site.upper(), - resample="sum", - discard=discard_var["pluvio"], - verbose=verbose, + elif instrument == "pluvio": + _instrument_tasks.append( + ( + k, + site, + dict( + ground=volumes[k], + site=site, + discard=discard_var["pluvio"], + column_time=ds.time, + column_height=ds["height"], + resample="sum", + ), + ) ) - - if instrument == "ld": - ds = match_datasets_act( - ds, - volumes[k], - site.upper(), - discard=discard_var["ldquants"], - resample="mean", - prefix="ldquants_", - verbose=verbose, + elif instrument == "ld": + _instrument_tasks.append( + ( + k, + site, + dict( + ground=volumes[k], + site=site, + discard=discard_var["ldquants"], + column_time=ds.time, + column_height=ds["height"], + resample="mean", + prefix="ldquants_", + ), + ) ) - - if instrument == "vd": - # Laser Disdrometer - Supplemental Site - ds = match_datasets_act( - ds, - volumes[k], - site.upper(), - discard=discard_var["vdisquants"], - resample="mean", - prefix="vdisquants_", - verbose=verbose, + elif instrument == "vd": + _instrument_tasks.append( + ( + k, + site, + dict( + ground=volumes[k], + site=site, + discard=discard_var["vdisquants"], + column_time=ds.time, + column_height=ds["height"], + resample="mean", + prefix="vdisquants_", + ), + ) ) - - if instrument == "wxt": - # Laser Disdrometer - Supplemental Site - ds = match_datasets_act( - ds, - volumes[k], - site.upper(), - discard=discard_var["wxt"], - resample="mean", - verbose=verbose, + elif instrument == "wxt": + _instrument_tasks.append( + ( + k, + site, + dict( + ground=volumes[k], + site=site, + discard=discard_var["wxt"], + column_time=ds.time, + column_height=ds["height"], + resample="mean", + prefix="wxt_", + ), + ) + ) + elif instrument == "sonde": + _instrument_tasks.append( + ( + k, + site, + dict( + ground=volumes[k], + site=site, + discard=discard_var["sonde"], + column_time=ds.time, + column_height=ds["height"], + resample="mean", + prefix="sonde_", + ), + ) ) + results = [] + for k, site, kwargs in _instrument_tasks: + _, matched = _prepare_match(**kwargs) + if matched is not None: + ds = _apply_match(ds, site, matched) + else: # There is no column extraction raise RuntimeError(": RadCLss FAILURE (All Columns Failed to Extract): ") @@ -725,6 +826,14 @@ def _get_nexrad_wrapper(time_str): del ds["time_offset"].attrs["units"] del ds["time"].attrs["units"] + if verbose: + print(" Adding input datastream names to dataset attributes...") + for k, v in datastream_names.items(): + ds_type = k.split("_")[0] + for var in ds.data_vars: + if var.startswith(ds_type): + ds[var].attrs["input_datastream"] = v + if verbose: print("\n" + "=" * 80) print(f"RadCLss Processing Complete for {volumes['date']}") @@ -739,3 +848,10 @@ def _get_nexrad_wrapper(time_str): print("=" * 80) return ds + + +def _is_valid_offset(s: str) -> bool: + try: + return pd.tseries.frequencies.to_offset(s) is not None + except ValueError: + return False diff --git a/src/radclss/util/column_utils.py b/src/radclss/util/column_utils.py index 1ca1f39..b8488e6 100644 --- a/src/radclss/util/column_utils.py +++ b/src/radclss/util/column_utils.py @@ -3,6 +3,7 @@ import act import numpy as np import xarray as xr +import pandas as pd import datetime import logging @@ -13,6 +14,117 @@ from ..config import DEFAULT_DISCARD_VAR, DEFAULT_NEXRAD_RADARS from ..config import get_output_config +_sonde_cache = {} +_nexrad_cache = {} + + +def _read_sonde_cached(path, exclude): + """Return a fully-loaded sonde dataset, reading from disk only on first call.""" + if path not in _sonde_cache: + raw = act.io.read_arm_netcdf(path, cleanup_qc=True, drop_variables=exclude) + _sonde_cache[path] = raw.compute() + raw.close() + return _sonde_cache[path] + + +def _read_nexrad_cache(path): + if path not in _nexrad_cache.keys(): + raw = pyart.io.read_nexrad_archive(path) + _nexrad_cache[path] = raw + # Only maintain 15 files in the cache to save memory + if len(_nexrad_cache.keys()) > 15: + first_key = next(iter(_nexrad_cache)) + del _nexrad_cache[first_key] + return _nexrad_cache[path] + + +def _grab_90_degree_rays(radar): + """Special case for column right over the radar in an RHI""" + # Get the rays within 0.5 degrees of 90 degrees + ray = np.argmin(radar.elevation["data"] - 90.0) + moment = {key: [] for key in radar.fields.keys()} + # Determine the center of each gate for the subsetted rays. + rhi_z = radar.range["data"] + + for key in moment: + moment[key] = radar.fields[key]["data"][ray, :].squeeze() + # Add radar elevation to height gates + # to define height as center of each gate above sea level + zgate = rhi_z + radar.altitude["data"][0] + # Determine the time at the center of each ray within the column + # Define the start of the radar volume as a numpy datetime object for xr + base_time = np.datetime64(pyart.util.datetime_from_radar(radar).isoformat(), "ns") + # Convert Py-ART radar object time (time since volume start) to time delta + # Add to base time to have sequential time within the xr Dataset + # for easier future merging/work + combined_time = pd.to_timedelta(radar.time["data"][ray], unit="s") + + # Create a blank list to hold the xarray DataArrays + ds_container = [] + da_meta = [ + "units", + "standard_name", + "long_name", + "valid_max", + "valid_min", + "coordinates", + ] + # Convert the moment dictionary to xarray DataArray. + # Apply radar object meta data to DataArray attribute + for key in moment: + if key != "height": + da = xr.DataArray( + moment[key], coords=dict(height=zgate), name=key, dims=["height"] + ) + for tag in da_meta: + if tag in radar.fields[key]: + da.attrs[tag] = radar.fields[key][tag] + # Append to ds container + ds_container.append(da.to_dataset(name=key)) + + # Add additional DataArrays 'base_time' and 'time_offset' + # if not present within the radar object. + da_base = xr.DataArray(base_time, name="base_time") + da_offset = xr.DataArray( + combined_time, coords=dict(height=zgate), name="time_offset", dims=["height"] + ) + ds_container.append(da_base.to_dataset(name="base_time")) + ds_container.append(da_offset.to_dataset(name="time_offset")) + + # Create a xarray DataSet from the DataArrays + column = xr.merge(ds_container) + + # Assign Attributes for the Height and Times + height_des = ( + "Height Above Sea Level [in meters] for the Center of Each" + + " Radar Gate Above the Target Location" + ) + column.height.attrs.update( + long_name="Height of Radar Beam", + units="m", + standard_name="height", + description=height_des, + ) + + column.base_time.attrs.update(long_name="UTC Reference Time", units="seconds") + + time_long = "Time in Seconds Since Volume Start" + time_des = ( + "Time in Seconds Since Volume Start that Cooresponds" + + " to the Center of Each Height Gate" + + " Above the Target Location" + ) + column.time_offset.attrs.update( + long_name=time_long, units="seconds", description=time_des + ) + + # Assign Global Attributes to the DataSet + column.attrs["distance_from_radar"] = "0 km" + column.attrs["azimuth"] = "0 degrees" + column.attrs["latitude_of_location"] = str(radar.latitude["data"][0]) + " degrees" + column.attrs["longitude_of_location"] = str(radar.longitude["data"][0]) + " degrees" + return column + def get_nexrad_column( rad_time, @@ -94,34 +206,39 @@ def get_nexrad_column( time_list = np.array(time_list) path = f"s3://{bucket_name}/" + file_list[np.argmin(np.abs(time_list - right_now))] - radar_obj = pyart.io.read_nexrad_archive(path) - column_list = [] - for lat, lon in zip(lats, lons): - # Make sure we are interpolating from the radar's location above sea level - # NOTE: interpolating throughout Troposphere to match sonde to in the future - - da = pyart.util.columnsect.column_vertical_profile(radar_obj, lat, lon) - # check for valid heights - valid = np.isfinite(da["height"]) - n_valid = int(valid.sum()) - if n_valid > 0: - da = da.sel(height=valid).sortby("height").interp(height=height_bins) - else: - target_height = xr.DataArray(height_bins, dims="height", name="height") - da = da.reindex(height=target_height) - - # Add the latitude and longitude of the extracted column - da["lat"], da["lon"] = lat, lon - # Convert timeoffsets to timedelta object and precision on datetime64 - da.time_offset.data = da.time_offset.values.astype("timedelta64[s]") - da.base_time.data = da.base_time.values.astype("datetime64[s]") - # Time is based off the start of the radar volume - da["gate_time"] = da.base_time.values + da.isel(height=0).time_offset.values - column_list.append(da) + radar_obj = _read_nexrad_cache(path) + try: + column_list = [] + for lat, lon in zip(lats, lons): + # Make sure we are interpolating from the radar's location above sea level + # NOTE: interpolating throughout Troposphere to match sonde to in the future + + da = pyart.util.columnsect.column_vertical_profile(radar_obj, lat, lon) + da = da.sortby("height") + # check for valid heights + valid = np.isfinite(da["height"]) + n_valid = int(valid.sum()) + if n_valid > 0: + da = da.sortby("height").interp(height=height_bins) + else: + target_height = xr.DataArray(height_bins, dims="height", name="height") + da = da.reindex(height=target_height) + + # Add the latitude and longitude of the extracted column + da["lat"], da["lon"] = lat, lon + # Convert timeoffsets to timedelta object and precision on datetime64 + da.time_offset.data = da.time_offset.values.astype("timedelta64[s]") + da.base_time.data = da.base_time.values.astype("datetime64[s]") + # Time is based off the start of the radar volume + da["gate_time"] = da.base_time.values + da.isel(height=0).time_offset.values + column_list.append(da) + finally: + del radar_obj # Concatenate the extracted radar columns for this scan across all sites ds = xr.concat([data for data in column_list if data], dim="station") ds = _add_station_vars(ds, sites, site_alt) + ds.attrs["nexrad_radar"] = nexrad_radar del column_list, da return ds @@ -184,12 +301,18 @@ def subset_points( f"{nfile} failed to open and is possibly corrupt." + "RadCLss will not generate a column for this file." ) - # Check for single sweep scans - if np.ma.is_masked(radar.sweep_start_ray_index["data"][1:]): - radar.sweep_start_ray_index["data"] = np.ma.array([0]) - radar.sweep_end_ray_index["data"] = np.ma.array([radar.nrays]) + return ds + + try: + # Check for RHI and reduce to first sweep if > 1 sweep + if "rhi" in radar.scan_type: + radar = radar.extract_sweeps([0]) + + # Check for single sweep scans + if np.ma.is_masked(radar.sweep_start_ray_index["data"][1:]): + radar.sweep_start_ray_index["data"] = np.ma.array([0]) + radar.sweep_end_ray_index["data"] = np.ma.array([radar.nrays]) - if radar: if radar.time["data"].size > 0: # Easier to map the nearest sonde file to radar gates before extraction if sonde is not None: @@ -213,14 +336,11 @@ def subset_points( for xfile in sonde ] # difference in time between radar file and each sonde file - start_diff = [radar_start - sonde for sonde in sonde_start] + start_diff = np.array([radar_start - sonde for sonde in sonde_start]) - # merge the sonde file into the radar object - ds_sonde = act.io.read_arm_netcdf( - sonde[start_diff.index(min(start_diff))], - cleanup_qc=True, - drop_variables=exclude_sonde, - ) + # merge the sonde file into the radar object (cached across radar files) + sonde_path = sonde[np.argmin(np.abs(start_diff))] + ds_sonde = _read_sonde_cached(sonde_path, exclude_sonde) # create list of variables within sonde dataset to add to the radar file for var in list(ds_sonde.keys()): @@ -241,34 +361,86 @@ def subset_points( radar.fields["sonde_" + var]["standard_name"] = sonde_dict[ "standard_name" ] - radar.fields["sonde_" + var]["datastream"] = ds_sonde.datastream + radar.fields["sonde_" + var][ + "input_datastream" + ] = ds_sonde.datastream del radar_start, sonde_start, ds_sonde del z_dict, sonde_dict column_list = [] - for lat, lon in zip(lats, lons): + for lat, lon, site in zip(lats, lons, sites): # Make sure we are interpolating from the radar's location above sea level # NOTE: interpolating throughout Troposphere to match sonde to in the future - da = pyart.util.columnsect.column_vertical_profile(radar, lat, lon) + if "rhi" not in radar.scan_type: + da = pyart.util.columnsect.column_vertical_profile(radar, lat, lon) + else: + try: + da = pyart.util.get_field_location(radar, lat, lon) + except ValueError: + # If the columnsect fails, try adding 180 to the azimuths to account for potential mislabeling of radar location + if np.all(radar.azimuth["data"] < 180): + radar.azimuth["data"] = radar.azimuth["data"] + 180 + # Need to adjust elevation as well to maintain the same relative geometry between radar and column locations + radar.elevation["data"] = 180 - radar.elevation["data"] + try: + da = pyart.util.get_field_location(radar, lat, lon) + except ValueError: + # Grab the vertically pointing ray(s) if the radar site == site + if radar.metadata["facility_id"] == site: + try: + da = _grab_90_degree_rays(radar) + except Exception: + logging.warning( + f"Failed to grab 90 degree rays for {site} from {nfile}." + + "NaNs will be returned for this column." + ) + else: + # NaNs will be returned if the columnsect fails again after adjusting the azimuths + da = pyart.util.columnsect.column_vertical_profile( + radar, lat, lon + ) + + time_offset = da["time_offset"] # check for valid heights + da = da.sortby("height") valid = np.isfinite(da["height"]) n_valid = int(valid.sum()) if n_valid > 0: - da = ( - da.sel(height=valid).sortby("height").interp(height=height_bins) - ) + try: + # Drop all NaNs + da = ( + da.dropna("height") + .sortby("height") + .interp(height=height_bins) + ) + except pd.errors.InvalidIndexError: + da = da.drop_duplicates("height", keep="first") + + valid = np.isfinite(da["height"]) + da = ( + da.dropna("height") + .sortby("height") + .interp(height=height_bins) + ) + time_offset = time_offset.drop_duplicates( + "height", keep="first" + ) else: target_height = xr.DataArray( height_bins, dims="height", name="height" ) da = da.reindex(height=target_height) + if "rhi" in radar.scan_type: + da["time_offset"] = time_offset # Add the latitude and longitude of the extracted column da["lat"], da["lon"] = lat, lon # Convert timeoffsets to timedelta object and precision on datetime64 - da.time_offset.data = da.time_offset.values.astype("timedelta64[s]") + da["time_offset"].data = da["time_offset"].values.astype( + "timedelta64[s]" + ) da.base_time.data = da.base_time.values.astype("datetime64[s]") # Time is based off the start of the radar volume da["gate_time"] = ( @@ -279,150 +451,106 @@ def subset_points( # Concatenate the extracted radar columns for this scan across all sites ds = xr.concat([data for data in column_list if data], dim="station") ds = _add_station_vars(ds, sites, site_alt) - # delete the radar to free up memory - del radar, column_list, da - else: - # delete the rhi file - del radar + + del column_list, da + finally: + del radar return ds -def match_datasets_act( - column, +def _prepare_match( ground, site, discard, + column_time, + column_height, resample="sum", resample_time="5Min", DataSet=False, prefix=None, - verbose=False, ): """ - Time synchronization of a Ground Instrumentation Dataset to - a Radar Column for Specific Locations using the ARM ACT package. - This module also supports vertically pointing radars such as the KAZR. - - Parameters - ---------- - column : Xarray DataSet - Xarray DataSet containing the extracted radar column above multiple locations. - Dimensions should include Time, Height, Site - - ground : str; Xarray DataSet - String containing the path of the ground instrumentation file that is desired - to be included within the extracted radar column dataset. - If DataSet is set to True, ground is Xarray Dataset and will skip I/O. - - site : str - Location of the ground instrument. Should be included within the filename. - - discard : list - List containing the desired input ground instrumentation variables to be - removed from the xarray DataSet. - - resample : str - Mathematical operational for resampling ground instrumentation to the radar time. - 'sum' will sum the ground data within the resample_time window, - 'mean' will average the ground data within the resample_time window, - and 'skip' will not resample the data and will only interpolate to the radar time. - Default is 'sum'. - - resample_time : str - Time resolution for resampling ground instrumentation data before mapping to radar time. - Default is "5Min". - - DataSet : boolean - Boolean flag to determine if ground input is an Xarray Dataset. - Set to True if ground input is Xarray DataSet. + Load a ground instrument file, resample it to the column time/height grid, + and return ``(site, matched_dataset)`` without modifying the column in-place. - prefix : str - prefix for the desired spelling of variable names for the input - datastream (to fix duplicate variable names between instruments) - - verbose : boolean - Boolean flag to set verbose output during processing. Default is False. - - Returns - ------- - ds : Xarray DataSet - Xarray Dataset containing the time-synced in-situ ground observations with - the inputed radar column + Safe to call concurrently from multiple threads. """ - # Check to see if input is xarray DataSet or a file path if DataSet: grd_ds = ground else: - # Read in the file using ACT - grd_ds = act.io.read_arm_netcdf(ground, cleanup_qc=True, drop_variables=discard) - # Default are Lazy Arrays; convert for matching with column - grd_ds = grd_ds.compute() - # check if a list containing new variable names exists. + _grd_raw = act.io.read_arm_netcdf( + ground, + cleanup_qc=True, + drop_variables=discard, + parallel=False, + ) + grd_ds = _grd_raw if prefix: - grd_ds = grd_ds.rename_vars({v: f"{prefix}{v}" for v in grd_ds.data_vars}) + if prefix == "wxt_": + rename_dict = { + v: f"{prefix}{v}" for v in grd_ds.data_vars if "wxt_" not in v + } + else: + rename_dict = {v: f"{prefix}{v}" for v in grd_ds.data_vars} + + grd_ds = grd_ds.rename_vars(rename_dict) - # Remove Base_Time before Resampling Data since you can't force 1 datapoint to 5 min sum if "base_time" in grd_ds.data_vars: del grd_ds["base_time"] - # Check to see if height is a dimension within the ground instrumentation. - # If so, first interpolate heights to match radar, before interpolating time. if "height" in grd_ds.dims: - grd_ds = grd_ds.interp(height=column["height"], method="linear") + if grd_ds["height"].attrs["units"] == "km": + grd_ds["height"] = grd_ds["height"] * 1000 + grd_ds["height"].attrs["units"] = "m" + grd_ds = grd_ds.interp(height=column_height, method="linear") if "range" in grd_ds.dims: - grd_ds = grd_ds.interp(range=column["height"], method="linear") + grd_ds = grd_ds.interp(range=column_height, method="linear") grd_ds = grd_ds.drop_vars("height") grd_ds = grd_ds.rename({"range": "height"}) - # Resample the ground data to 5 min and interpolate to the radar time. - # Keep data variable attributes to help distingish between instruments/locations - # Keep only numeric data variables to avoid issues with resampling non-numeric variables (e.g. lat/lon) non_numeric_vars = [ var for var in grd_ds.data_vars if not np.issubdtype(grd_ds[var].dtype, np.number) ] - grd_ds = grd_ds.drop_vars(non_numeric_vars) + if resample == "mean": matched = ( grd_ds.resample(time=resample_time, closed="right") .mean(keep_attrs=True) - .interp(time=column.time, method="linear") + .interp(time=column_time, method="linear") ) elif resample == "skip": - matched = grd_ds.interp(time=column.time, method="linear") + matched = grd_ds.interp(time=column_time, method="linear") elif resample == "sum": matched = ( grd_ds.resample(time=resample_time, closed="right") .sum(keep_attrs=True) - .interp(time=column.time, method="linear") + .interp(time=column_time, method="linear") ) else: raise ValueError( "Invalid resample method. Please choose 'mean', 'sum', or 'skip'." ) - # Add site location as a dimension for the Pluvio data matched = matched.assign_coords(coords=dict(station=site)) matched = matched.expand_dims("station") - # Remove Lat/Lon Data variables as it is included within the Matched Dataset with Site Identfiers - if "lat" in matched.data_vars: - del matched["lat"] - if "lon" in matched.data_vars: - del matched["lon"] - if "alt" in matched.data_vars: - del matched["alt"] - - # Update the individual Variables to Hold Global Attributes - # global attributes will be lost on merging into the matched dataset. - # Need to keep as many references and descriptors as possible + for attr in ("lat", "lon", "alt"): + if attr in matched.data_vars: + del matched[attr] + for var in matched.data_vars: matched[var].attrs.update(source=matched.datastream) + grd_ds.close() + _grd_raw.close() + return site, matched + - # Merge the two DataSets +def _apply_match(column, site, matched): + """Merge a prepared match result into ``column`` in-place and return it.""" for k in matched.data_vars: if k in column.data_vars: column[k].sel(station=site)[:] = matched.sel(station=site)[k][:].astype( @@ -442,10 +570,85 @@ def match_datasets_act( column[k] = ( column[k].fillna(column[k].attrs["missing_value"]).astype(float) ) - grd_ds.close() return column +def match_datasets_act( + column, + ground, + site, + discard, + resample="sum", + resample_time="5Min", + DataSet=False, + prefix=None, + verbose=False, +): + """ + Time synchronization of a Ground Instrumentation Dataset to + a Radar Column for Specific Locations using the ARM ACT package. + This module also supports vertically pointing radars such as the KAZR. + + Parameters + ---------- + column : Xarray DataSet + Xarray DataSet containing the extracted radar column above multiple locations. + Dimensions should include Time, Height, Site + + ground : str; Xarray DataSet + String containing the path of the ground instrumentation file that is desired + to be included within the extracted radar column dataset. + If DataSet is set to True, ground is Xarray Dataset and will skip I/O. + + site : str + Location of the ground instrument. Should be included within the filename. + + discard : list + List containing the desired input ground instrumentation variables to be + removed from the xarray DataSet. + + resample : str + Mathematical operational for resampling ground instrumentation to the radar time. + 'sum' will sum the ground data within the resample_time window, + 'mean' will average the ground data within the resample_time window, + and 'skip' will not resample the data and will only interpolate to the radar time. + Default is 'sum'. + + resample_time : str + Time resolution for resampling ground instrumentation data before mapping to radar time. + Default is "5Min". + + DataSet : boolean + Boolean flag to determine if ground input is an Xarray Dataset. + Set to True if ground input is Xarray DataSet. + + prefix : str + prefix for the desired spelling of variable names for the input + datastream (to fix duplicate variable names between instruments) + + verbose : boolean + Boolean flag to set verbose output during processing. Default is False. + + Returns + ------- + ds : Xarray DataSet + Xarray Dataset containing the time-synced in-situ ground observations with + the inputed radar column + """ + _, matched = _prepare_match( + ground, + site, + discard, + column.time, + column["height"], + resample=resample, + resample_time=resample_time, + DataSet=DataSet, + prefix=prefix, + ) + return _apply_match(column, site, matched) + + def _add_station_vars(ds, sites, site_alt): ds["station"] = sites # Assign the Main and Supplemental Site altitudes diff --git a/src/radclss/vis/quicklooks.py b/src/radclss/vis/quicklooks.py index 78f36a0..a951dfe 100644 --- a/src/radclss/vis/quicklooks.py +++ b/src/radclss/vis/quicklooks.py @@ -102,6 +102,9 @@ def create_radclss_columns( ) ).plot(y="height", ax=axarr[row, col], vmin=vmin, vmax=vmax, **kwargs) + if isinstance(radclss, str): + ds.close() + return fig, axarr diff --git a/tests/test_radclss.py b/tests/test_radclss.py index f2c3bea..aee3a0b 100644 --- a/tests/test_radclss.py +++ b/tests/test_radclss.py @@ -18,23 +18,24 @@ def test_radclss_serial(): if not username or not token: return # Skip test if credentials are not set - act.discovery.download_arm_data( - username, - token, - "bnfcsapr2cfrS3.a1", - "2025-06-19T12:00:00", - "2025-06-19T12:30:00", - output=test_data_path, - ) - - act.discovery.download_arm_data( - username, - token, - "bnfsondewnpnM1.b1", - "2025-06-18T00:00:00", - "2025-06-20T00:00:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfcsapr2cfrS3.a1", + "2025-06-19T12:00:00", + "2025-06-19T12:30:00", + output=test_data_path, + ) + if glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfinterpolatedsondeM1.c1", + "2025-06-18T00:00:00", + "2025-06-20T00:00:00", + output=test_data_path, + ) for files in arm_test_data.DATASETS.registry.keys(): if "bnf" in files: if not os.path.exists(os.path.join(test_data_path, files)): @@ -43,7 +44,7 @@ def test_radclss_serial(): rad_path = os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc") radar_files = sorted(glob.glob(rad_path)) sonde_files = sorted( - glob.glob(os.path.join(test_data_path, "*bnfsondewnpnM1.b1*cdf")) + glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*")) ) vd_M1_files = glob.glob(os.path.join(test_data_path, "*bnfvdisquantsM1.c1*nc")) @@ -117,20 +118,24 @@ def test_radclss_serial(): ) assert not (my_columns["sonde_v_wind"].sel(station=site) == missing_value).all() missing_value = ( - my_columns["sonde_tdry"].sel(station=site).attrs.get("missing_value", None) + my_columns["sonde_temp"].sel(station=site).attrs.get("missing_value", None) ) - assert not (my_columns["sonde_tdry"].sel(station=site) == missing_value).all() + assert not (my_columns["sonde_temp"].sel(station=site) == missing_value).all() missing_value = ( my_columns["sonde_rh"].sel(station=site).attrs.get("missing_value", None) ) assert not (my_columns["sonde_rh"].sel(station=site) == missing_value).all() missing_value = ( - my_columns["sonde_pres"].sel(station=site).attrs.get("missing_value", None) + my_columns["sonde_bar_pres"] + .sel(station=site) + .attrs.get("missing_value", None) ) - assert not (my_columns["sonde_pres"].sel(station=site) == missing_value).all() + assert not ( + my_columns["sonde_bar_pres"].sel(station=site) == missing_value + ).all() # Met data check - for site in ["M1", "S20", "S30", "S40", "S13"]: + for site in ["M1", "S20", "S30", "S40"]: missing_value = ( my_columns["temp_mean"].sel(station=site).attrs.get("_FillValue", None) ) @@ -190,24 +195,25 @@ def test_radclss_parallel(): token = os.getenv("ARM_PASSWORD") if not username or not token: return # Skip test if credentials are not set - - act.discovery.download_arm_data( - username, - token, - "bnfcsapr2cfrS3.a1", - "2025-06-19T12:00:00", - "2025-06-19T12:30:00", - output=test_data_path, - ) - - act.discovery.download_arm_data( - username, - token, - "bnfsondewnpnM1.b1", - "2025-06-18T00:00:00", - "2025-06-20T00:00:00", - output=test_data_path, - ) + # If data is not cached already, download the necessary data for testing + if glob.glob(os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfcsapr2cfrS3.a1", + "2025-06-19T12:00:00", + "2025-06-19T12:30:00", + output=test_data_path, + ) + if glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfinterpolatedsondeM1.c1", + "2025-06-18T00:00:00", + "2025-06-20T00:00:00", + output=test_data_path, + ) for files in arm_test_data.DATASETS.registry.keys(): if "bnf" in files: if not os.path.exists(os.path.join(test_data_path, files)): @@ -216,7 +222,7 @@ def test_radclss_parallel(): rad_path = os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc") radar_files = sorted(glob.glob(rad_path)) sonde_files = sorted( - glob.glob(os.path.join(test_data_path, "*bnfsondewnpnM1.b1*cdf")) + glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*")) ) vd_M1_files = glob.glob(os.path.join(test_data_path, "*bnfvdisquantsM1.c1*nc")) @@ -290,20 +296,24 @@ def test_radclss_parallel(): ) assert not (my_columns["sonde_v_wind"].sel(station=site) == missing_value).all() missing_value = ( - my_columns["sonde_tdry"].sel(station=site).attrs.get("missing_value", None) + my_columns["sonde_temp"].sel(station=site).attrs.get("missing_value", None) ) - assert not (my_columns["sonde_tdry"].sel(station=site) == missing_value).all() + assert not (my_columns["sonde_temp"].sel(station=site) == missing_value).all() missing_value = ( my_columns["sonde_rh"].sel(station=site).attrs.get("missing_value", None) ) assert not (my_columns["sonde_rh"].sel(station=site) == missing_value).all() missing_value = ( - my_columns["sonde_pres"].sel(station=site).attrs.get("missing_value", None) + my_columns["sonde_bar_pres"] + .sel(station=site) + .attrs.get("missing_value", None) ) - assert not (my_columns["sonde_pres"].sel(station=site) == missing_value).all() + assert not ( + my_columns["sonde_bar_pres"].sel(station=site) == missing_value + ).all() # Met data check - for site in ["M1", "S20", "S30", "S40", "S13"]: + for site in ["M1", "S20", "S30", "S40"]: missing_value = ( my_columns["temp_mean"].sel(station=site).attrs.get("_FillValue", None) ) @@ -363,23 +373,24 @@ def test_subset_points(): if not username or not token: return # Skip test if credentials are not set - act.discovery.download_arm_data( - username, - token, - "bnfcsapr2cfrS3.a1", - "2025-06-19T12:00:00", - "2025-06-19T12:30:00", - output=test_data_path, - ) - - act.discovery.download_arm_data( - username, - token, - "bnfsondewnpnM1.b1", - "2025-06-18T00:00:00", - "2025-06-20T00:00:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfcsapr2cfrS3.a1", + "2025-06-19T12:00:00", + "2025-06-19T12:30:00", + output=test_data_path, + ) + if glob.glob(os.path.join(test_data_path, "*bnfsondewnpnM1.b1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfsondewnpnM1.b1", + "2025-06-18T00:00:00", + "2025-06-20T00:00:00", + output=test_data_path, + ) rad_path = os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc") radar_files = sorted(glob.glob(rad_path)) @@ -394,13 +405,11 @@ def test_subset_points(): assert np.array_equal(subset_ds["height"].values, np.arange(500, 8500, 250)) assert "sonde_u_wind" not in subset_ds.data_vars assert "sonde_v_wind" not in subset_ds.data_vars - assert "sonde_tdry" not in subset_ds.data_vars + assert "sonde_temp" not in subset_ds.data_vars assert "sonde_rh" not in subset_ds.data_vars # Test with rawinsonde input instead of sonde=False - sonde_files = sorted( - glob.glob(os.path.join(test_data_path, "*bnfsondewnpnM1.b1*cdf")) - ) + sonde_files = sorted(glob.glob(os.path.join(test_data_path, "*bnfsondewnpnM1.b1*"))) subset_ds = radclss.util.subset_points( radar_files[0], input_site_dict, sonde=sonde_files @@ -414,7 +423,6 @@ def test_subset_points(): assert "sonde_rh" in subset_ds.data_vars assert "sonde_u_wind" in subset_ds.data_vars assert "sonde_v_wind" in subset_ds.data_vars - assert "sonde_tdry" in subset_ds.data_vars assert "sonde_rh" in subset_ds.data_vars @@ -431,34 +439,37 @@ def test_radclss_with_kasacr(): return # Skip test if credentials are not set # Download CSAPR2 data - act.discovery.download_arm_data( - username, - token, - "bnfcsapr2cfrS3.a1", - "2025-06-01T12:00:00", - "2025-06-01T12:30:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfcsapr2cfrS3.a1", + "2025-06-01T12:00:00", + "2025-06-01T12:30:00", + output=test_data_path, + ) # Download KASACR data - act.discovery.download_arm_data( - username, - token, - "bnfkasacrcfrS4.a1", - "2025-06-01T12:00:00", - "2025-06-01T12:30:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfkasacrcfrS4.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfkasacrcfrS4.a1", + "2025-06-01T12:00:00", + "2025-06-01T12:30:00", + output=test_data_path, + ) # Download sonde data - act.discovery.download_arm_data( - username, - token, - "bnfsondewnpnM1.b1", - "2025-06-01T00:00:00", - "2025-06-02T00:00:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfinterpolatedsondeM1.c1", + "2025-06-01T00:00:00", + "2025-06-02T00:00:00", + output=test_data_path, + ) # Fetch any other test data for files in arm_test_data.DATASETS.registry.keys(): @@ -474,7 +485,7 @@ def test_radclss_with_kasacr(): glob.glob(os.path.join(test_data_path, "*bnfkasacrcfrS4.a1*.nc")) ) sonde_files = sorted( - glob.glob(os.path.join(test_data_path, "*bnfsondewnpnM1.b1*cdf")) + glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*")) ) # Gather other instrument files @@ -573,34 +584,37 @@ def test_radclss_with_kazr(): return # Skip test if credentials are not set # Download CSAPR2 data - act.discovery.download_arm_data( - username, - token, - "bnfcsapr2cfrS3.a1", - "2025-06-01T12:00:00", - "2025-06-01T12:30:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfcsapr2cfrS3.a1", + "2025-06-01T12:00:00", + "2025-06-01T12:30:00", + output=test_data_path, + ) # Download KASACR data - act.discovery.download_arm_data( - username, - token, - "bnfkazr2cfrprM1.a1", - "2025-06-01T12:00:00", - "2025-06-01T12:30:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfkazr2cfrprM1.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfkazr2cfrprM1.a1", + "2025-06-01T12:00:00", + "2025-06-01T12:30:00", + output=test_data_path, + ) # Download sonde data - act.discovery.download_arm_data( - username, - token, - "bnfsondewnpnM1.b1", - "2025-06-01T00:00:00", - "2025-06-02T00:00:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfinterpolatedsondeM1.c1", + "2025-06-01T00:00:00", + "2025-06-02T00:00:00", + output=test_data_path, + ) # Fetch any other test data for files in arm_test_data.DATASETS.registry.keys(): @@ -616,7 +630,7 @@ def test_radclss_with_kazr(): glob.glob(os.path.join(test_data_path, "*bnfkazr2cfrprM1.a1*.nc")) ) sonde_files = sorted( - glob.glob(os.path.join(test_data_path, "*bnfsondewnpnM1.b1*cdf")) + glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*")) ) # Gather other instrument files @@ -715,14 +729,15 @@ def test_radclss_parallel_with_nexrad(): return # Skip test if credentials are not set # Download CSAPR2 data - act.discovery.download_arm_data( - username, - token, - "bnfcsapr2cfrS3.a1", - "2025-06-19T12:00:00", - "2025-06-19T12:30:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfcsapr2cfrS3.a1", + "2025-06-19T12:00:00", + "2025-06-19T12:30:00", + output=test_data_path, + ) # Gather all the radar files csapr2_files = sorted( @@ -807,34 +822,37 @@ def test_radclss_parallel_with_kasacr(): return # Skip test if credentials are not set # Download CSAPR2 data - act.discovery.download_arm_data( - username, - token, - "bnfcsapr2cfrS3.a1", - "2025-06-19T12:00:00", - "2025-06-19T12:30:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfcsapr2cfrS3.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfcsapr2cfrS3.a1", + "2025-06-19T12:00:00", + "2025-06-19T12:30:00", + output=test_data_path, + ) # Download KASACR data - act.discovery.download_arm_data( - username, - token, - "bnfkasacrcfrS4.a1", - "2025-06-19T12:00:00", - "2025-06-19T12:30:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfkasacrcfrS4.a1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfkasacrcfrS4.a1", + "2025-06-19T12:00:00", + "2025-06-19T12:30:00", + output=test_data_path, + ) # Download sonde data - act.discovery.download_arm_data( - username, - token, - "bnfsondewnpnM1.b1", - "2025-06-18T00:00:00", - "2025-06-20T00:00:00", - output=test_data_path, - ) + if glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*.nc")) == []: + act.discovery.download_arm_data( + username, + token, + "bnfinterpolatedsondeM1.c1", + "2025-06-18T00:00:00", + "2025-06-20T00:00:00", + output=test_data_path, + ) # Fetch any other test data for files in arm_test_data.DATASETS.registry.keys(): @@ -850,7 +868,7 @@ def test_radclss_parallel_with_kasacr(): glob.glob(os.path.join(test_data_path, "*bnfkasacrcfrS4.a1*.nc")) ) sonde_files = sorted( - glob.glob(os.path.join(test_data_path, "*bnfsondewnpnM1.b1*cdf")) + glob.glob(os.path.join(test_data_path, "*bnfinterpolatedsondeM1.c1*")) ) # Gather other instrument files