Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion cwmscli/usgs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,17 @@ def usgs_group():
type=str,
help='Backfill timeseries ids, use list of timeseries ids (e.g. "ts_id1, ts_id2") to attempt to backfill a subset of timeseries with USGS data',
)
@click.option(
"-bv",
"--backfill-version",
default=None,
type=str,
help='Backfill version, save data to a different version of the timeseries (e.g. "Rev-USGS" instead of "Raw-USGS")',
)
@requires(reqs.cwms, reqs.requests)
def getusgs_timeseries(office, days_back, api_root, api_key, api_key_loc, backfill):
def getusgs_timeseries(
office, days_back, api_root, api_key, api_key_loc, backfill, backfill_version
):
from cwmscli.usgs.getusgs_cda import getusgs_cda

if backfill is not None:
Expand All @@ -57,6 +66,52 @@ def getusgs_timeseries(office, days_back, api_root, api_key, api_key_loc, backfi
days_back=days_back,
api_key=api_key,
backfill_tsids=backfill_list,
backfill_version=backfill_version,
)


@usgs_group.command(
"timeseries-v2",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could get a bit confusing to have multiple sub commands for timeseries.

What are your thoughts on having an api-source with it defaulting to v1 until that goes away? Then you add a warning in this version+ that says

"switch to using source --version=2 as soon as possible"

Something like cwms-cli usgs timeseries --version=2

They're almost certainly going to make a v3, v4, v5 and so on

That would make the command look like this

commands:
  timeseries
  timeseries-v2
  timeseries-v3
  timeseries-v4
  timeseries-v5
  ...
  timeseries-v349

or you could even be more explicit and say --api-version

Then we continue to default to v1, then when USGS sunsets or at some point we just remove v1 as the default and/or attempt backwards compat. Same inputs but to a different api endpoint if possible.

help="Get USGS timeseries values using the new OGC API and store into CWMS database",
)
@office_option
@days_back_option
@api_root_option
@api_key_option
@api_key_loc_option
@click.option(
"-b",
"--backfill",
default=None,
type=str,
help='Backfill timeseries ids, use list of timeseries ids (e.g. "ts_id1, ts_id2") to attempt to backfill a subset of timeseries with USGS data',
)
@click.option(
"-bv",
"--backfill-version",
default=None,
type=str,
help='Backfill version, save data to a different version of the timeseries (e.g. "Rev-USGS" instead of "Raw-USGS")',
)
@requires(reqs.cwms, reqs.requests)
def getusgs_timeseries_v2(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then you'd put this inside the other getusgs_timeseries and have a switch for based on the version the enduser wants to target?

office, days_back, api_root, api_key, api_key_loc, backfill, backfill_version
):
from cwmscli.usgs.getusgs_cda import getusgs_cda_ogc

if backfill is not None:
backfill_list = [item.strip() for item in backfill.split(",") if item.strip()]
else:
backfill_list = None

api_key = get_api_key(api_key, api_key_loc)
getusgs_cda_ogc(
api_root=api_root,
office_id=office,
days_back=days_back,
api_key=api_key,
backfill_tsids=backfill_list,
backfill_version=backfill_version,
)


Expand Down
172 changes: 164 additions & 8 deletions cwmscli/usgs/getusgs_cda.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def getusgs_cda(
days_back: float,
api_key: str,
backfill_tsids: list = None,
backfill_version: str = None,
):
api_key = "apikey " + api_key
cwms.api.init_session(api_root=api_root, api_key=api_key)
Expand All @@ -60,6 +61,10 @@ def getusgs_cda(
if backfill_tsids:
USGS_ts = USGS_ts[USGS_ts["timeseries-id"].isin(backfill_tsids)]

USGS_ts = _validate_backfill_version_timeseries(
USGS_ts, backfill_version, office_id
)

if len(USGS_ts) > 0:
# grab all of the unique USGS stations numbers to be sent to USGS api
sites = USGS_ts[USGS_ts["USGS_Method_TS"].isna()].USGS_St_Num.unique()
Expand Down Expand Up @@ -88,7 +93,9 @@ def getusgs_cda(
if len(method_sites) > 0:
USGS_data_method = getUSGS_ts(method_sites, startDT, endDT, 3)

CWMS_writeData(USGS_ts, USGS_data, USGS_data_method, days_back)
CWMS_writeData(
USGS_ts, USGS_data, USGS_data_method, days_back, backfill_version
)
else:
if backfill_tsids:
_log_error_and_exit(
Expand Down Expand Up @@ -134,6 +141,9 @@ def get_USGS_params():
return USGS_Params


_USGS_PARAMS = get_USGS_params()


def get_CMWS_TS_Loc_Data(office):
"""
get time series group and location alias information and combine into singe dataframe
Expand All @@ -143,8 +153,8 @@ def get_CMWS_TS_Loc_Data(office):
def find_usgsparam(attribute, param):
if attribute > 0:
usgs_param = str(attribute).split(".")[0]
elif param in USGS_Params.index:
usgs_param = USGS_Params.at[param, "USGS_PARAMETER"]
elif param in _USGS_PARAMS.index:
usgs_param = _USGS_PARAMS.at[param, "USGS_PARAMETER"]
else:
usgs_param = "Not Found"
return usgs_param
Expand Down Expand Up @@ -236,7 +246,6 @@ def find_usgsparam(attribute, param):
[USGS_ts[USGS_ts["USGS_St_Num"].notnull()], USGS_ts_base], axis=0
)

USGS_Params = get_USGS_params()
# this code fills in the USGS_Params field with values in the Time Series Group Attribute if it exists. If it does not exist it
# grabs the default USGS paramter for the coresponding CWMS parameter
USGS_ts.attribute = USGS_ts.apply(
Expand Down Expand Up @@ -269,6 +278,7 @@ def getUSGS_ts(sites, startDT, endDT, access=None):
# "modifiedSince": "PT6H",
"siteStatus": "active",
}
query_dict = {k: v for k, v in query_dict.items() if v is not None}

r = requests.get(base_url, params=query_dict).json()

Expand All @@ -285,7 +295,63 @@ def getUSGS_ts(sites, startDT, endDT, access=None):
return USGS_data


def CWMS_writeData(USGS_ts, USGS_data, USGS_data_method, days_back):
def _replace_ts_version(ts_id: str, backfill_version: str) -> str:
"""Replace the version component of a timeseries ID.

The last dot-separated component is the version (e.g., Raw-USGS, Rev-USGS).
This function replaces it with the provided backfill_version.
"""
parts = ts_id.rsplit(".", 1)
if len(parts) == 2:
return f"{parts[0]}.{backfill_version}"
return ts_id


def _validate_backfill_version_timeseries(
usgs_ts: pd.DataFrame, backfill_version: str, office_id: str
) -> pd.DataFrame:
"""Validate that all timeseries with backfill_version exist in CWMS.

Returns dataframe with missing backfill_version timeseries removed.
"""
if backfill_version is None:
return usgs_ts

try:
pattern = f".*\\.{backfill_version}$"
response = cwms.get_timeseries_identifiers(
office_id=office_id, timeseries_id_regex=pattern
)

existing_ts = set()
if response.df is not None and not response.df.empty:
existing_ts = set(response.df["timeseries-id"].values)

missing_ts = []
missing_original_ts = []
for ts_id in usgs_ts["timeseries-id"].unique():
new_ts_id = _replace_ts_version(ts_id, backfill_version)
if new_ts_id not in existing_ts:
missing_ts.append(new_ts_id)
missing_original_ts.append(ts_id)

if missing_ts:
logging.warning(
f"The following timeseries with backfill_version '{backfill_version}' do not exist in CWMS and will be skipped: {missing_ts}"
)
usgs_ts = usgs_ts[~usgs_ts["timeseries-id"].isin(missing_original_ts)]

except Exception as e:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching every error generically will fail, but then continue anyways. We may want to return here so we don't continue anyways if we had failures?

And/or handle explicit exceptions?

logging.warning(
f"Could not verify timeseries with backfill_version for office {office_id}: {e}"
)

return usgs_ts


def CWMS_writeData(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opinion on this

Do not start a function/def with uppercase only do that with classes

Then we mix other case as well

PEP 8 would be lower_snake_case and would otherwise be conventionally cwms_write_data

See this in other variable names/ functions that mix camelCase and capitalization

USGS_ts, USGS_data, USGS_data_method, days_back, backfill_version=None
):
# lists to hold time series that fail
# noData -> usgs location and parameter were present in USGS api but the values were empty
# NotinAPI -> usgs location and parameter were not retrieved from USGS api
Expand Down Expand Up @@ -371,10 +437,20 @@ def CWMS_writeData(USGS_ts, USGS_data, USGS_data_method, days_back):
office = row["office-id"]
values["quality-code"] = 0

# apply backfill_version if provided
store_ts_id = (
_replace_ts_version(ts_id, backfill_version)
if backfill_version
else ts_id
)

# write values to CWMS database
try:
data = cwms.timeseries_df_to_json(
data=values, ts_id=ts_id, units=units, office_id=office
data=values,
ts_id=store_ts_id,
units=units,
office_id=office,
)
if days_back < 365:
cwms.store_timeseries(data)
Expand All @@ -383,13 +459,13 @@ def CWMS_writeData(USGS_ts, USGS_data, USGS_data_method, days_back):
data, max_workers=30, chunk_size=30 * 24 * 4
)
logging.info(
f"SUCCESS Data stored in CWMS database for --> {ts_id},{USGS_Id_param}"
f"SUCCESS Data stored in CWMS database for --> {store_ts_id},{USGS_Id_param}"
)
saved = saved + 1
except Exception as error:
storErr.append([ts_id, USGS_Id_param, error])
logging.error(
f"FAIL Data could not be stored to CWMS database for --> {ts_id},{USGS_Id_param} CDA error = {error}"
f"FAIL Data could not be stored to CWMS database for --> {store_ts_id},{USGS_Id_param} CDA error = {error}"
)
except Exception as error:
logging.error(
Expand All @@ -414,3 +490,83 @@ def CWMS_writeData(USGS_ts, USGS_data, USGS_data_method, days_back):
logging.info(
f"The following ts_ids errored because multiple method TSID were present for the USGS station. A USGS method TSID needs to be defined in the time series group in CWMS or an incorrect TSID is defined. {mult_ids}"
)


_OGC_BASE_URL = "https://api.waterdata.usgs.gov/ogcapi/v0/collections/continuous"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we split the API domain/endpoints out from this and centralize them in the init.py so we will have one place to change them later on



def getUSGS_ts_ogc(sites, startDT, endDT, access=None):
"""Fetch USGS instantaneous values from the OGC API endpoint.

Produces the same DataFrame structure as getUSGS_ts() so CWMS_writeData() works unchanged.

TODO: fill in OGC response parsing once sample JSON is available.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are heading towards a requests call?

We use dataretrieval in other places, do we know if there are plans to update it to v2? (or 1 since they use v0 now) - imo we keep the api target the same as theres and say v0 default, v1 for current to match?

(it's v2 for us sure)

https://github.com/DOI-USGS/dataretrieval-python

i.e.

from dataretrieval import waterdata

df, metadata = waterdata.get_continuous(
    monitoring_location_id="USGS-01646500",
    parameter_code="00065",
    time="2024-10-01/2025-09-30",
)

"""
raise NotImplementedError(
"getUSGS_ts_ogc() is not yet implemented. "
"OGC API response parsing is pending. Use 'usgs timeseries' for the legacy endpoint."
)


def getusgs_cda_ogc(
api_root: str,
office_id: str,
days_back: float,
api_key: str,
backfill_tsids: list = None,
backfill_version: str = None,
):
"""Fetch USGS time series data using the new OGC API and store into CWMS.

This is the OGC API variant of getusgs_cda(). The shared data fetching and writing
logic (get_CMWS_TS_Loc_Data, CWMS_writeData) is reused unchanged.
"""
api_key = "apikey " + api_key
cwms.api.init_session(api_root=api_root, api_key=api_key)
logging.info(f"CDA connection: {api_root}")
logging.info(
f"Data will be grabbed and stored from USGS OGC API for past {days_back} days for office: {office_id}"
)
execution_date = datetime.now()

USGS_ts = get_CMWS_TS_Loc_Data(office_id)

if backfill_tsids:
USGS_ts = USGS_ts[USGS_ts["timeseries-id"].isin(backfill_tsids)]

USGS_ts = _validate_backfill_version_timeseries(
USGS_ts, backfill_version, office_id
)

if len(USGS_ts) > 0:
sites = USGS_ts[USGS_ts["USGS_Method_TS"].isna()].USGS_St_Num.unique()
method_sites = USGS_ts[USGS_ts["USGS_Method_TS"].notna()].USGS_St_Num.unique()
logging.info(f"Execution date {execution_date}")

tw_delta = -timedelta(days_back)
startDT = execution_date + tw_delta
endDT = execution_date + timedelta(hours=2)

logging.info(f"Grabbing data from USGS OGC API between {startDT} and {endDT}")

USGS_data = pd.DataFrame()
USGS_data_method = pd.DataFrame()

if len(sites) > 0:
USGS_data = getUSGS_ts_ogc(sites, startDT, endDT)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both of these getUSGS_ts_ogccould throw an error if they do -h because of the NotImplementedError?

if len(method_sites) > 0:
USGS_data_method = getUSGS_ts_ogc(method_sites, startDT, endDT, 3)

CWMS_writeData(
USGS_ts, USGS_data, USGS_data_method, days_back, backfill_version
)
else:
if backfill_tsids:
_log_error_and_exit(
f"The following backfill time series ids were not present in the USGS time series or location alias groups: {backfill_tsids}"
)
else:
_log_error_and_exit(
f"No eligible USGS time series were found for office {office_id}.",
"Confirm that time series exist in Data Acquisition / USGS TS Data Acquisition and that matching entries exist in Agency Aliases / USGS Station Number.",
)
Loading
Loading