Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion cwmscli/commands/commands_cwms.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,15 @@ def shef_import_crit(filename, office, api_root, api_key, api_key_loc, dry_run):
# ================================================================================
@shef_group.command(
"import_infile",
help="Import SHEF .in file into timeseries group for SHEF file processing",
help=(
"Import a legacy exportShef .in configuration file into a CWMS "
"timeseries group (default category 'SHEF Export'). Each entry "
"becomes a group member whose alias-id encodes the SHEF location, "
"PE code, send code, duration, and (optionally) units. "
"If the bulk save fails, each TSID is validated via "
"get_timeseries_identifier; missing TSIDs are logged and skipped, "
"and the save is retried with the surviving entries."
),
)
@click.option(
"-f",
Expand Down
64 changes: 61 additions & 3 deletions cwmscli/commands/shef/import_infile.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,13 +588,32 @@ def build_group_json(
# ---------------------------------------------------------------------------


def store_group(
def _filter_existing_tsids(
entries: list[dict],
office_id: str,
) -> list[dict]:
"""Probe each TSID via cwms.get_timeseries_identifier and drop missing ones."""
valid: list[dict] = []
for e in entries:
try:
cwms.get_timeseries_identifier(ts_id=e["tsid"], office_id=office_id)
valid.append(e)
except Exception as exc:
log.warning(
"Skipping missing TSID '%s' (%s)",
e["tsid"],
type(exc).__name__,
)
return valid


def _save(
group_json: dict,
group_id: str,
office_id: str,
fail_if_exists: bool,
) -> None:
"""POST (create) or PATCH (update) the timeseries group in CWMS."""
"""Single store/update attempt with no validation fallback."""
try:
cwms.store_timeseries_groups(group_json, fail_if_exists=fail_if_exists)
log.info("SUCCESS — group stored via store_timeseries_groups.")
Expand All @@ -606,7 +625,6 @@ def store_group(
exc,
)

# Fallback: update, replacing all assigned timeseries
cwms.update_timeseries_groups(
data=group_json,
group_id=group_id,
Expand All @@ -616,6 +634,44 @@ def store_group(
log.info("SUCCESS — group updated via update_timeseries_groups.")


def store_group(
group_json: dict,
group_id: str,
office_id: str,
fail_if_exists: bool,
entries: Optional[list[dict]] = None,
category_id: str = DEFAULT_CATEGORY,
) -> None:
"""POST (create) or PATCH (update) the timeseries group in CWMS.

On failure, validate each TSID individually, drop any that don't exist in
CWMS, and retry the save with the surviving entries.
"""
try:
_save(group_json, group_id, office_id, fail_if_exists)
return
except Exception as exc:
if entries is None:
raise
log.warning(
"Group save failed (%s: %s) — validating each TSID and retrying ...",
type(exc).__name__,
exc,
)

valid = _filter_existing_tsids(entries, office_id)
if not valid:
log.error("No valid TSIDs remain after validation — nothing to store.")
return
if len(valid) == len(entries):
log.error("All TSIDs validated but save still failed; re-raising.")
raise

log.info("Retrying save with %d/%d valid TSIDs.", len(valid), len(entries))
retry_json = build_group_json(valid, group_id, office_id, category_id)
_save(retry_json, group_id, office_id, fail_if_exists=False)


# ---------------------------------------------------------------------------
# Importable function for CLI integration
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -713,6 +769,8 @@ def import_shef_infile(
group_id=group_name,
office_id=office_id,
fail_if_exists=fail_if_exists,
entries=entries,
category_id=category_id,
)

log.info(
Expand Down
14 changes: 14 additions & 0 deletions tests/commands/fixtures/MRBWM_GAPT.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Missouri River Basin Water Management GRFT Forecast
# Extract CWMS into SHEF for MVS and MBRFC and post to LDM
#
debug 1
type .E
ti t t+31d
system eng
tz z
DB LOCAL

PE Flow-Out.*=QT
FORMAT QT(5.1)
ts Fcst-MRBWM-GRFT_TW=FX
GAPT.Flow-Out.Inst.6Hours.0.Fcst-MRBWM-GRFT_TW
113 changes: 112 additions & 1 deletion tests/commands/test_shef_import_infile_dry_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest
from click.testing import CliRunner

from cwmscli.commands.shef.import_infile import import_shef_infile
from cwmscli.commands.shef.import_infile import import_shef_infile, parse_in_file


@pytest.fixture
Expand Down Expand Up @@ -200,3 +200,114 @@ def test_import_shef_infile_parses_location_mappings(fake_cwms, caplog):
# Verify that location mappings were recognized (GENW3, LYNW3, GTTI4)
# These are the SHEF location IDs from the LOCATION directives
assert "GENW3" in output or "LYNW3" in output or "GTTI4" in output


# ---------------------------------------------------------------------------
# Regression tests: both .in file styles
# ---------------------------------------------------------------------------

_NWO_FIXTURE = Path(__file__).parent / "fixtures" / "MRBWM_GAPT.in"


def test_nwo_style_version_send_code():
"""ts VersionName=SEND_CODE maps a specific TSID version to a send code."""
if not _NWO_FIXTURE.exists():
pytest.skip(f"Fixture file not found: {_NWO_FIXTURE}")
_, entries = parse_in_file(_NWO_FIXTURE)
assert len(entries) == 1
assert entries[0]["send_code"] == "FX"


def test_nwo_style_location_fallback():
"""Without a LOCATION directive the CWMS location part is used as the SHEF ID."""
if not _NWO_FIXTURE.exists():
pytest.skip(f"Fixture file not found: {_NWO_FIXTURE}")
_, entries = parse_in_file(_NWO_FIXTURE)
assert len(entries) == 1
assert entries[0]["shef_loc"] == "GAPT"
assert entries[0]["tsid"] == "GAPT.Flow-Out.Inst.6Hours.0.Fcst-MRBWM-GRFT_TW"


def test_missing_tsid_falls_back_to_validated_save(tmp_path, monkeypatch, caplog):
"""If save fails, validate each TSID and retry with only the valid ones."""
import logging

from cwmscli.commands.shef import import_infile as mod

caplog.set_level(logging.INFO)

in_file = tmp_path / "missing.in"
in_file.write_text(
"PE Flow.*=QR\n"
"ts Best-MRBWM=RG\n"
"loc AAA=AAA\nAAA.Flow.Inst.1Hour.0.Best-MRBWM\n"
"loc BBB=BBB\nBBB.Flow.Inst.1Hour.0.Best-MRBWM\n"
"loc CCC=CCC\nCCC.Flow.Inst.1Hour.0.Best-MRBWM\n",
encoding="utf-8",
)

missing_tsid = "BBB.Flow.Inst.1Hour.0.Best-MRBWM"
store_calls: list[dict] = []

def fake_store(payload, fail_if_exists):
store_calls.append(payload)
# First call (with all 3 entries) fails; retry (with 2) succeeds.
if any(m["timeseries-id"] == missing_tsid for m in payload["members"]):
raise RuntimeError(f"DOES_NOT_EXIST: {missing_tsid}")

def fake_update(data, group_id, office_id, replace_assigned_ts):
# Force the inner _save fallback to also fail so store_group enters
# the per-TSID validation branch.
raise RuntimeError(f"DOES_NOT_EXIST: {missing_tsid}")

def fake_get_tsid(ts_id, office_id):
if ts_id == missing_tsid:
raise RuntimeError("404 Not Found")
return {"office-id": office_id, "timeseries-id": ts_id}

def fake_init_session(api_root, api_key=None, token=None):
pass

def fake_df_to_json(
data, group_id, group_office_id, category_office_id, category_id
):
return {
"group-id": group_id,
"office-id": group_office_id,
"category-id": category_id,
"members": data.to_dict("records"),
}

monkeypatch.setattr(mod.cwms, "store_timeseries_groups", fake_store)
monkeypatch.setattr(mod.cwms, "update_timeseries_groups", fake_update)
monkeypatch.setattr(mod.cwms, "get_timeseries_identifier", fake_get_tsid)
monkeypatch.setattr(mod.cwms, "timeseries_group_df_to_json", fake_df_to_json)
monkeypatch.setattr(mod.cwms_api, "init_session", fake_init_session)

import_shef_infile(
in_file=str(in_file),
group_name="Test Group",
office_id="MVP",
api_root="https://test.example.com/",
api_key="test-key",
)

# First store had 3 entries (failed), retry store had 2 (succeeded).
assert len(store_calls) == 2
assert len(store_calls[0]["members"]) == 3
assert len(store_calls[1]["members"]) == 2
retried_ids = {m["timeseries-id"] for m in store_calls[1]["members"]}
assert missing_tsid not in retried_ids
assert "Skipping missing TSID" in caplog.text
assert "Retrying save with 2/3 valid TSIDs" in caplog.text


def test_existing_style_location_and_wildcard_send_code():
"""Existing style: LOCATION directives + TS * = SEND_CODE still works."""
fixture_path = Path(__file__).parent / "fixtures" / "exportShef_CWMS_LD8-10.in"
if not fixture_path.exists():
pytest.skip(f"Fixture file not found: {fixture_path}")
_, entries = parse_in_file(fixture_path)
assert len(entries) > 0
genw3 = [e for e in entries if e["shef_loc"].startswith("GENW3")]
assert genw3, "Expected LOCATION mapping GENW3 for LockDam_08*"
Loading