diff --git a/.gitignore b/.gitignore index 308de2a0c..90cd81cf0 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,9 @@ gradle.properties kdata pki/certs compose.env +etl.env .vscode **/node_modules/ -cwms-data-api/features.properties \ No newline at end of file +cwms-data-api/features.properties +cda-etl/logs +cda-etl/cache \ No newline at end of file diff --git a/cda-etl/Dockerfile b/cda-etl/Dockerfile new file mode 100644 index 000000000..eef22e200 --- /dev/null +++ b/cda-etl/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.14-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY src/ /app/src/ + +ENTRYPOINT ["python", "src/cda_etl/main.py"] diff --git a/cda-etl/build.gradle b/cda-etl/build.gradle new file mode 100644 index 000000000..00ae286b3 --- /dev/null +++ b/cda-etl/build.gradle @@ -0,0 +1,77 @@ +/* + * MIT License + * Copyright (c) 2026 Hydrologic Engineering Center + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +plugins { + id 'base' +} + +def isWindows = { + System.getProperty('os.name').toLowerCase().contains('win') +} + +final def pythonCmd = isWindows() ? 'python' : 'python3' +final def mainScript = 'src/cda_etl/main.py' +final def envFile = 'etl.env' +final def reqFile = 'requirements.txt' + +tasks.register('installRequirements', Exec) { + commandLine pythonCmd, '-m', 'pip', 'install', '-r', reqFile + workingDir projectDir + inputs.file(new File(projectDir, reqFile)) + outputs.file(new File(buildDir, 'pip-install.marker')) + doLast { + mkdir buildDir + new File(buildDir, 'pip-install.marker').text = "installed at ${new Date()}\n" + } +} + +tasks.register('runEtl', Exec) { + dependsOn 'installRequirements' + group 'application' + executable pythonCmd + args mainScript + workingDir projectDir + + // Load environment variables from etl.env + doFirst { + def envFileObj = new File(projectDir, envFile) + if (envFileObj.exists()) { + envFileObj.eachLine { line -> + if (line.trim() && !line.startsWith('#')) { + def parts = line.split('=', 2) + if (parts.length == 2) { + environment parts[0].trim(), parts[1].trim() + } + } + } + } else { + logger.warn("Environment file ${envFile} not found.") + } + } +} + +tasks.register('runEtlUnitTests', Exec) { + dependsOn 'installRequirements' + group 'verification' + executable pythonCmd + args '-m', 'pytest' + workingDir projectDir + environment 'PYTHONPATH', 'src/cda_etl' +} diff --git a/cda-etl/docker-compose.yml b/cda-etl/docker-compose.yml new file mode 100644 index 000000000..cffd17bfa --- /dev/null +++ b/cda-etl/docker-compose.yml @@ -0,0 +1,14 @@ +services: + cda-etl: + build: . + volumes: + - ./cache:/app/cache + - ./logs:/app/logs + environment: + -SOURCE_CDA_URL: + -SOURCE_CDA_API_KEY: + -DEST_CDA_URL: + -DEST_CDA_API_KEY: + -START_TIME: + -END_TIME: + -LOOKBACK: diff --git a/cda-etl/etl.env.example b/cda-etl/etl.env.example new file mode 100644 index 000000000..9ad467be9 --- /dev/null +++ b/cda-etl/etl.env.example @@ -0,0 +1,36 @@ +# +# MIT License +# Copyright (c) 2025 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + +# Required settings +SOURCE_CDA_URL=https://cwms-data-test.cwbi.us/cwms-data/ +SOURCE_CDA_API_KEY= +DEST_CDA_URL=http://localhost:7000/cwms-data +DEST_CDA_API_KEY= +START_TIME=2025-01-01 +END_TIME=2026-01-01 + +# Optional settings +MAX_THREADS=10 +LOG_LEVEL=INFO + +# Data retrieval +LOCATIONS=SWT.EUFA-Dam +PROJECTS=SWT.EUFA +TIMESERIES=SWT.EUFA.Elev.Inst.1Hour.0.Ccp-Rev \ No newline at end of file diff --git a/cda-etl/requirements.txt b/cda-etl/requirements.txt new file mode 100644 index 000000000..008f5b339 --- /dev/null +++ b/cda-etl/requirements.txt @@ -0,0 +1,4 @@ +cwms-python +pytest +pytest-mock +pytest-cov \ No newline at end of file diff --git a/cda-etl/src/cda_etl/__init__.py b/cda-etl/src/cda_etl/__init__.py new file mode 100644 index 000000000..11cc650a8 --- /dev/null +++ b/cda-etl/src/cda_etl/__init__.py @@ -0,0 +1,17 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. diff --git a/cda-etl/src/cda_etl/config.py b/cda-etl/src/cda_etl/config.py new file mode 100644 index 000000000..43c74a006 --- /dev/null +++ b/cda-etl/src/cda_etl/config.py @@ -0,0 +1,79 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import os +import logging +from datetime import datetime + +logger = logging.getLogger(__name__) +DATE_TIME_FORMAT = "%Y-%m-%d" + + +class Config: + source_cda_url: str + source_cda_api_key: str + dest_cda_url: str + dest_cda_api_key: str + start_time: datetime + end_time: datetime + max_threads: int + locations: list[str] + projects: list[str] + timeseries: list[str] + + def __init__(self): + self.source_cda_url = os.getenv("SOURCE_CDA_URL") + self.source_cda_api_key = os.getenv("SOURCE_CDA_API_KEY") + self.dest_cda_url = os.getenv("DEST_CDA_URL") + self.dest_cda_api_key = os.getenv("DEST_CDA_API_KEY") + start_time = os.getenv("START_TIME") + end_time = os.getenv("END_TIME") + max_threads = os.getenv("MAX_THREADS", "1") + self.locations = os.getenv("LOCATIONS", "").split(",") + self.projects = os.getenv("PROJECTS", "").split(",") + self.timeseries = os.getenv("TIMESERIES", "").split(",") + + if not self.source_cda_url or not self.dest_cda_url: + raise ValueError("Missing required environment variables for CDA ETL configuration") + + if not self.source_cda_api_key: + logger.warning("Missing SOURCE_CDA_API_KEY environment variable.") + + if not self.dest_cda_api_key: + logger.warning("Missing DEST_CDA_API_KEY environment variable.") + + + if not start_time and not end_time: + raise ValueError("Must set both START_TIME and END_TIME in the format of %Y-%m-%d. Example:\n" + "START_TIME=2023-01-01\n" + "END_TIME=2023-01-31") + + try: + self.start_time = datetime.strptime(start_time, DATE_TIME_FORMAT) + except: + raise ValueError("START_TIME must be in the format of %Y-%m-%d") + try: + self.end_time = datetime.strptime(end_time, DATE_TIME_FORMAT) + except: + raise ValueError("END_TIME must be in the format of %Y-%m-%d") + + try: + self.max_threads = int(max_threads) + except ValueError: + raise ValueError("MAX_THREADS must be a number") + + logger.info(f"Configuration read: {str(self)}") \ No newline at end of file diff --git a/cda-etl/src/cda_etl/location.py b/cda-etl/src/cda_etl/location.py new file mode 100644 index 000000000..b722141da --- /dev/null +++ b/cda-etl/src/cda_etl/location.py @@ -0,0 +1,88 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +from concurrent.futures import ThreadPoolExecutor +import logging +import cwms +import utils.cache_util as cache_util +import utils.threading_util as threading_util + +logger = logging.getLogger(__name__) + + +def get_valid_locations(locations): + location_ids = [] + index = 0 + for location in locations: + splits = location.split(".") + if len(splits) != 2: + logger.warning(f"Invalid location at {index}: {location}\nExpected [officeid].[locationid]") + else: + logger.debug(f"Valid location found at {index}: {location}, splits: {splits}") + location_ids.append(splits) + index += 1 + + if not location_ids: + logger.warning("No valid locations provided for processing") + return location_ids + + +def cache_locations(locations): + # Validation + location_ids = get_valid_locations(locations) + + if not location_ids: + logger.warning("No valid locations found for retrieving") + return + + # Retrieval + threading_util.execute_tasks(_retrieve_one_location, location_ids) + + +def store_cached_locations(locations): + location_ids = get_valid_locations(locations) + + if not location_ids: + logger.warning("No valid locations found for retrieving") + return + + # Storage + threading_util.execute_tasks(_store_one_location, location_ids) + + +def _retrieve_one_location(location): + office_id = location[0] + location_id = location[1] + + logger.debug(f"Retrieving location data for office {office_id} and location {location_id}") + cache_data = cache_util.get_from_cache(office_id, "Locations", location_id) + if cache_data: + logger.debug(f"Location data found in cache for office {office_id} and location {location_id}") + else: + logger.debug(f"Location data not found in cache for office {office_id} and location {location_id}") + location_data = cwms.get_location(location_id, office_id).json + cache_util.put_in_cache(location_data, office_id, "Locations", location_id) + + +def _store_one_location(location): + office_id = location[0] + location_id = location[1] + location_data = cache_util.get_from_cache(office_id, "Locations", location_id) + if location_data: + cwms.store_location(location_data) + else: + logger.warning(f"Location data not found in cache for office {office_id} and location {location_id}") diff --git a/cda-etl/src/cda_etl/main.py b/cda-etl/src/cda_etl/main.py new file mode 100644 index 000000000..ddeb3d9e2 --- /dev/null +++ b/cda-etl/src/cda_etl/main.py @@ -0,0 +1,77 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import traceback +import sys +import logging +import os +import location +import project +import timeseries +import utils.threading_util +from datetime import datetime +from config import Config +from session_manager import SessionManager + +logger = logging.getLogger(__name__) + +def pipeline(config, session_manager): + session_manager.use_source_session() + + # Read and cache data + location.cache_locations(config.locations) + project.cache_projects(config.projects) + timeseries.cache_timeseries(config.timeseries, config.start_time, config.end_time) + + session_manager.use_dest_session() + # Store cached data, so we're not keeping it all in memory + location.store_cached_locations(config.locations) + project.store_cached_projects(config.projects) + timeseries.store_cached_timeseries(config.timeseries, config.start_time, config.end_time) + + +def init(): + config = Config() + session_manager = SessionManager(config) + utils.threading_util.init_executor(config.max_threads) + return config, session_manager + + +if __name__ == "__main__": + now = datetime.now() + log_level_str = os.getenv("LOG_LEVEL", "INFO").upper() + log_level = getattr(logging, log_level_str, logging.INFO) + + logging.basicConfig(filename=f'logs/cda-etl-{now.strftime("%Y%m%d%H%M%S")}.log', level=log_level) + + logger.debug(f"Using log level: {log_level_str}") + + try: + config, session_manager = init() + + try: + pipeline(config, session_manager) + except Exception as e: + logger.error(f"Unhandled exception occurred during ETL pipeline execution: {e}") + traceback.print_exc() + sys.exit(1) + + except Exception as e: + logger.error(f"Unhandled exception occurred during initialization: {e}") + traceback.print_exc() + sys.exit(1) + diff --git a/cda-etl/src/cda_etl/project.py b/cda-etl/src/cda_etl/project.py new file mode 100644 index 000000000..43681adfe --- /dev/null +++ b/cda-etl/src/cda_etl/project.py @@ -0,0 +1,81 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import logging +import utils.threading_util as threading_util +import utils.cache_util as cache_util +import location +import cwms +logger = logging.getLogger(__name__) + + +def cache_projects(projects): + # Make sure we have project locations downloaded + location.cache_locations(projects) + + # Validation + project_ids = location.get_valid_locations(projects) + + if not project_ids: + logger.warning("No valid project identifiers found for processing") + return + + # Retrieval + cwms.api.API_VERSION = 1 + threading_util.execute_tasks(_retrieve_one_project, project_ids) + cwms.api.API_VERSION = 2 + + +def store_cached_projects(projects): + # Validation + project_ids = location.get_valid_locations(projects) + + if not project_ids: + logger.warning("No valid project identifiers found for processing") + return + + location.store_cached_locations(projects) + + # Storage + threading_util.execute_tasks(_store_one_project, project_ids) + + +def _retrieve_one_project(project): + office_id = project[0] + project_id = project[1] + + logger.debug(f"API_VERSION before retrieving {project_id}: {cwms.api.API_VERSION}") + logger.debug(f"Retrieving project data for office {office_id} and project {project_id}") + cache_data = cache_util.get_from_cache(office_id, "Projects", project_id) + if cache_data: + logger.debug(f"Project data found in cache for office {office_id} and project {project_id}") + else: + logger.debug(f"Project data not found in cache for office {office_id} and project {project_id}, retrieving from CWMS") + project_data = cwms.get_project(office_id, project_id).json + cache_util.put_in_cache(project_data, office_id, "Projects", project_id) + + +def _store_one_project(project): + office_id = project[0] + project_id = project[1] + logger.debug(f"API_VERSION before retrieving {project_id}: {cwms.api.API_VERSION}") + project_data = cache_util.get_from_cache(office_id, "Projects", project_id) + if project_data: + cwms.store_project(project_data) + else: + logger.warning(f"Project data not found in cache for office {office_id} and location {project_id}") + diff --git a/cda-etl/src/cda_etl/session_manager.py b/cda-etl/src/cda_etl/session_manager.py new file mode 100644 index 000000000..2e41bf306 --- /dev/null +++ b/cda-etl/src/cda_etl/session_manager.py @@ -0,0 +1,36 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import cwms +import logging +logger = logging.getLogger(__name__) + +from config import Config + + +class SessionManager: + config: Config + + def __init__(self, config): + self.config = config + + def use_source_session(self): + cwms.init_session(api_root=self.config.source_cda_url, api_key=self.config.source_cda_api_key) + + def use_dest_session(self): + logger.debug(f"Initializing destination session with URL: {self.config.dest_cda_url} and api_key: {self.config.dest_cda_api_key}") + cwms.init_session(api_root=self.config.dest_cda_url, api_key=self.config.dest_cda_api_key) \ No newline at end of file diff --git a/cda-etl/src/cda_etl/timeseries.py b/cda-etl/src/cda_etl/timeseries.py new file mode 100644 index 000000000..653775c6e --- /dev/null +++ b/cda-etl/src/cda_etl/timeseries.py @@ -0,0 +1,125 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import logging +import location +import utils.threading_util as threading_util +import utils.cache_util as cache_util +import cwms + +logger = logging.getLogger(__name__) +DATE_TIME_FORMAT = "%Y-%m-%d %H.%M.%S" + + +def cache_timeseries(timeseries, begin, end): + locations, ts_info = _validate_and_split_timeseries(timeseries, begin, end) + + # Make sure we have project locations downloaded + location.cache_locations(locations) + + # Retrieval of Identifier + threading_util.execute_tasks(_retrieve_one_ts_identifier, ts_info) + + # Retrieval of Data + threading_util.execute_tasks(_retrieve_one_ts_data, ts_info) + + +def store_cached_timeseries(timeseries, begin, end): + locations, ts_info = _validate_and_split_timeseries(timeseries, begin, end) + location.store_cached_locations(locations) + + # Storage of Identifier + threading_util.execute_tasks(_store_one_ts_id, ts_info) + + # Storage of Data + threading_util.execute_tasks(_store_one_ts_data, ts_info) + + +def _retrieve_one_ts_identifier(ts_info): + office_id = ts_info[0] + ts_id = ts_info[1] + + cache_data = cache_util.get_from_cache(office_id, "Timeseries Identifiers", ts_id, "id") + if cache_data: + logger.debug(f"Cached Timeseries Identifier for {office_id}.{ts_id}") + else: + logger.debug(f"Fetching Timeseries Identifier for {office_id}.{ts_id}") + data = cwms.get_timeseries_identifier(ts_id, office_id).json + cache_util.put_in_cache(data, office_id, "Timeseries Identifiers", ts_id, "id") + + +def _retrieve_one_ts_data(ts_info): + office_id = ts_info[0] + ts_id = ts_info[1] + begin = ts_info[2] + end = ts_info[3] + begin_str = begin.strftime(DATE_TIME_FORMAT) + end_str = end.strftime(DATE_TIME_FORMAT) + + cache_data = cache_util.get_from_cache(office_id, "Timeseries", ts_id, begin_str, end_str, "data") + if cache_data: + logger.debug(f"Cached Timeseries Data for {office_id}.{ts_id} from {begin_str} to {end_str}") + else: + logger.debug(f"Fetching Timeseries Data for {office_id}.{ts_id} from {begin_str} to {end_str}") + data = cwms.get_timeseries(ts_id, office_id, begin=begin, end=end).json + cache_util.put_in_cache(data, office_id, "Timeseries", ts_id, begin_str, end_str, "data") + + +def _store_one_ts_id(ts_info): + + office_id = ts_info[0] + ts_id = ts_info[1] + + cache_data = cache_util.get_from_cache(office_id, "Timeseries Identifiers", ts_id, "id") + cwms.store_timeseries_identifier(cache_data) + +def _store_one_ts_data(ts_info): + office_id = ts_info[0] + ts_id = ts_info[1] + begin = ts_info[2] + end = ts_info[3] + begin_str = begin.strftime(DATE_TIME_FORMAT) + end_str = end.strftime(DATE_TIME_FORMAT) + + cache_data = cache_util.get_from_cache(office_id, "Timeseries", ts_id, begin_str, end_str, "data") + cwms.store_timeseries(cache_data) + + +def _validate_and_split_timeseries(timeseries, begin, end): + # Validation + invalid_ts = [] + ts_ids_to_split = {} + for ts in timeseries: + splits = ts.split(".") + if len(splits) != 7: + logger.warning(f"Invalid time series identifier '{ts}' encountered. Expected format is '[office_id].[location].[parameter].[parameter_type].[interval].[duration].[version]'") + invalid_ts.append(ts) + else: + logger.debug(f"Valid time series identifier '{ts}'") + ts_ids_to_split[f"{splits[1]}.{splits[2]}.{splits[3]}.{splits[4]}.{splits[5]}.{splits[6]}"] = splits + + if not ts_ids_to_split: + logger.warning("No valid time series identifiers found for processing") + return + + locations = [] + ts_info = [] + for id, splits in ts_ids_to_split.items(): + locations.append(f"{splits[0]}.{splits[1]}") + ts_info.append([splits[0], id, begin, end]) + + return locations, ts_info diff --git a/cda-etl/src/cda_etl/utils/__init__.py b/cda-etl/src/cda_etl/utils/__init__.py new file mode 100644 index 000000000..11cc650a8 --- /dev/null +++ b/cda-etl/src/cda_etl/utils/__init__.py @@ -0,0 +1,17 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. diff --git a/cda-etl/src/cda_etl/utils/cache_util.py b/cda-etl/src/cda_etl/utils/cache_util.py new file mode 100644 index 000000000..2facc71db --- /dev/null +++ b/cda-etl/src/cda_etl/utils/cache_util.py @@ -0,0 +1,67 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import os +import json + +__folder = "./cache" + +def get_from_cache(*args): + """ + :param args: Path components for cache file + :return: Cached json data or None if not found + """ + path = _get_cache_path(*args) + if not path: + return None + + if not os.path.exists(path): + return None + + with open(path, 'r') as f: + return f.read() + + + + +def _get_cache_path(*args): + ''' + :param args: Path components for cache file + :return: Full path to cache file or None if the file doesn't exist + ''' + if not args: + return None + + # Convert args to list and ensure last element has .json extension + path_parts = list(args) + if not path_parts[-1].endswith('.json'): + path_parts[-1] = path_parts[-1] + '.json' + + # Build full path starting with __folder + full_path = os.path.join(__folder, *path_parts) + return full_path + + +def put_in_cache(value, *args): + path = _get_cache_path(*args) + + if not os.path.exists(path): + os.makedirs(os.path.dirname(path), exist_ok=True) + + with open(path, "w", encoding="utf-8") as file: + json.dump(value, file, indent=2) \ No newline at end of file diff --git a/cda-etl/src/cda_etl/utils/threading_util.py b/cda-etl/src/cda_etl/utils/threading_util.py new file mode 100644 index 000000000..4d6aacc3f --- /dev/null +++ b/cda-etl/src/cda_etl/utils/threading_util.py @@ -0,0 +1,46 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import logging +import traceback +from concurrent.futures import as_completed, ThreadPoolExecutor + +logger = logging.getLogger(__name__) + +EXECUTOR: ThreadPoolExecutor + +def init_executor(max_workers): + global EXECUTOR + EXECUTOR = ThreadPoolExecutor(max_workers=max_workers) + + +def execute_tasks(task_func, items): + """ + Executes a task function for each item in a list using the provided executor. + Returns a dictionary mapping futures to items. + """ + futures_to_items = { + EXECUTOR.submit(task_func, item): item + for item in items + } + + for future in as_completed(futures_to_items): + item = futures_to_items[future] + if future.exception(): + logger.warning(f"Exception occurred for {item}: {future.exception()}") + elif future.result(): + logger.debug(f"No error on execution for {item}") diff --git a/cda-etl/tests/cda_etl/__init__.py b/cda-etl/tests/cda_etl/__init__.py new file mode 100644 index 000000000..11cc650a8 --- /dev/null +++ b/cda-etl/tests/cda_etl/__init__.py @@ -0,0 +1,17 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. diff --git a/cda-etl/tests/cda_etl/test_config.py b/cda-etl/tests/cda_etl/test_config.py new file mode 100644 index 000000000..cdad0edcd --- /dev/null +++ b/cda-etl/tests/cda_etl/test_config.py @@ -0,0 +1,50 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import os +import sys +from datetime import datetime +from config import Config + +def test_config(): + os.environ["SOURCE_CDA_URL"] = "http://source" + os.environ["DEST_CDA_URL"] = "http://dest" + os.environ["START_TIME"] = "2023-01-01" + os.environ["END_TIME"] = "2023-01-31" + + # Test valid MAX_THREADS + os.environ["MAX_THREADS"] = "5" + config = Config() + assert config.max_threads == 5 + print("Test valid MAX_THREADS passed") + + # Test default MAX_THREADS + del os.environ["MAX_THREADS"] + config = Config() + assert config.max_threads == 1 + print("Test default MAX_THREADS passed") + + # Test invalid MAX_THREADS + os.environ["MAX_THREADS"] = "abc" + try: + Config() + print("Test invalid MAX_THREADS failed (no exception raised)") + except ValueError as e: + assert str(e) == "MAX_THREADS must be a number" + print("Test invalid MAX_THREADS passed (exception raised)") + diff --git a/cda-etl/tests/cda_etl/test_location.py b/cda-etl/tests/cda_etl/test_location.py new file mode 100644 index 000000000..3e77242e5 --- /dev/null +++ b/cda-etl/tests/cda_etl/test_location.py @@ -0,0 +1,98 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import pytest +from unittest.mock import MagicMock, patch +import location +from location import LocationData + +@pytest.fixture +def mock_config(): + config = MagicMock() + config.locations = ["SWT.TestLoc"] + return config + +@pytest.fixture +def mock_session_manager(): + return MagicMock() + +def test_process(mock_config, mock_session_manager, mocker): + mock_process_locations = mocker.patch("location.process_locations") + mock_process_locations.return_value = LocationData(["SWT.TestLoc"]) + + result = location.process(mock_config, mock_session_manager) + + mock_process_locations.assert_called_once_with(mock_config.locations, mock_session_manager) + assert result.location_ids == ["SWT.TestLoc"] + +def test_process_locations(mock_session_manager, mocker): + mock_execute = mocker.patch("utils.threading_util.execute_tasks") + + # Mock retrieval results: [[location_str, location_data], ...] + retrieval_results = [["SWT.TestLoc", {"name": "TestLoc"}]] + # Mock storage results: [[retrieval_result, storage_result], ...] + # where retrieval_result is ["SWT.TestLoc", {"name": "TestLoc"}] + storage_results = [[retrieval_results[0], {"name": "TestLoc"}]] + + mock_execute.side_effect = [retrieval_results, storage_results] + + locations = ["SWT.TestLoc"] + result = location.cache_locations(locations, mock_session_manager) + + assert mock_session_manager.use_source_session.called + assert mock_session_manager.use_dest_session.called + assert len(mock_execute.call_args_list) == 2 + assert result.location_ids == storage_results + +def test_retrieve_one_location_invalid_format(mocker): + logger_spy = mocker.spy(location.logger, "warning") + result = location._retrieve_one_location("invalid_location") + assert result is None + assert logger_spy.called + +def test_retrieve_one_location_from_cache(mocker): + mock_get_cache = mocker.patch("utils.cache_util.get_from_cache") + mock_get_cache.return_value = {"name": "CachedLoc"} + + result = location._retrieve_one_location("SWT.CachedLoc") + + assert result == {"name": "CachedLoc"} + mock_get_cache.assert_called_once_with("SWT", "CachedLoc") + +def test_retrieve_one_location_from_cwms(mocker): + mocker.patch("utils.cache_util.get_from_cache", return_value=None) + mock_put_cache = mocker.patch("utils.cache_util.put_in_cache") + mock_cwms_get = mocker.patch("cwms.get_location") + + mock_response = MagicMock() + mock_response.json = {"name": "CwmsLoc"} + mock_cwms_get.return_value = mock_response + + result = location._retrieve_one_location("SWT.CwmsLoc") + + assert result == {"name": "CwmsLoc"} + mock_cwms_get.assert_called_once_with("CwmsLoc", "SWT") + mock_put_cache.assert_called_once_with({"name": "CwmsLoc"}, "SWT", "CwmsLoc") + +def test_store_one_location(mocker): + mock_cwms_store = mocker.patch("cwms.store_location") + location_data = {"name": "TestLoc"} + + result = location._store_one_location(location_data) + + assert result == location_data + mock_cwms_store.assert_called_once_with(location_data) diff --git a/cda-etl/tests/cda_etl/test_project.py b/cda-etl/tests/cda_etl/test_project.py new file mode 100644 index 000000000..e1e19fa09 --- /dev/null +++ b/cda-etl/tests/cda_etl/test_project.py @@ -0,0 +1,96 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import pytest +from unittest.mock import MagicMock +import project +from project import ProjectData + +@pytest.fixture +def mock_config(): + config = MagicMock() + config.projects = ["SWT.TestProj"] + return config + +@pytest.fixture +def mock_session_manager(): + return MagicMock() + +def test_process(mock_config, mock_session_manager, mocker): + mock_process_projects = mocker.patch("project.process_projects") + mock_process_projects.return_value = ProjectData(["SWT.TestProj"]) + + result = project.process(mock_config, mock_session_manager) + + mock_process_projects.assert_called_once_with(mock_config.projects, mock_session_manager) + assert result.project_ids == ["SWT.TestProj"] + +def test_process_projects(mock_session_manager, mocker): + mocker.patch("location.process_locations") + mock_execute = mocker.patch("utils.threading_util.execute_tasks") + + retrieval_results = [["SWT.TestProj", {"name": "TestProj"}]] + storage_results = [[retrieval_results[0], {"name": "TestProj"}]] + + mock_execute.side_effect = [retrieval_results, storage_results] + + projects = ["SWT.TestProj"] + result = project.cache_projects(projects, mock_session_manager) + + assert mock_session_manager.use_source_session.called + assert mock_session_manager.use_dest_session.called + assert len(mock_execute.call_args_list) == 2 + assert result.project_ids == storage_results + +def test_retrieve_one_project_invalid_format(mocker): + logger_spy = mocker.spy(project.logger, "warning") + result = project._retrieve_one_project("invalid_project") + assert result is None + assert logger_spy.called + +def test_retrieve_one_project_from_cache(mocker): + mock_get_cache = mocker.patch("utils.cache_util.get_from_cache") + mock_get_cache.return_value = {"name": "CachedProj"} + + result = project._retrieve_one_project("SWT.CachedProj") + + assert result == {"name": "CachedProj"} + mock_get_cache.assert_called_once_with("SWT", "CachedProj") + +def test_retrieve_one_project_from_cwms(mocker): + mocker.patch("utils.cache_util.get_from_cache", return_value=None) + mock_put_cache = mocker.patch("utils.cache_util.put_in_cache") + mock_cwms_get = mocker.patch("cwms.get_project") + + mock_response = MagicMock() + mock_response.json = {"name": "CwmsProj"} + mock_cwms_get.return_value = mock_response + + result = project._retrieve_one_project("SWT.CwmsProj") + + assert result == {"name": "CwmsProj"} + mock_cwms_get.assert_called_once_with("SWT", "CwmsProj") + mock_put_cache.assert_called_once_with({"name": "CwmsProj"}, "SWT", "CwmsProj") + +def test_store_one_project(mocker): + mock_cwms_store = mocker.patch("cwms.store_project") + project_data = {"name": "TestProj"} + + result = project._store_one_project(project_data) + + assert result == project_data + mock_cwms_store.assert_called_once_with(project_data) diff --git a/cda-etl/tests/cda_etl/test_timeseries.py b/cda-etl/tests/cda_etl/test_timeseries.py new file mode 100644 index 000000000..b5bc43f23 --- /dev/null +++ b/cda-etl/tests/cda_etl/test_timeseries.py @@ -0,0 +1,140 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import pytest +from unittest.mock import MagicMock +import timeseries +from timeseries import TsCacheData + +@pytest.fixture +def mock_config(): + config = MagicMock() + config.timeseries = ["SWT.TestLoc.Flow.Inst.1Hour.0.Cda"] + config.start_time = "2026-01-01T00:00:00" + config.end_time = "2026-01-02T00:00:00" + return config + +@pytest.fixture +def mock_session_manager(): + return MagicMock() + +def test_process(mock_config, mock_session_manager, mocker): + mock_process_timeseries = mocker.patch("timeseries.process_timeseries") + mock_process_timeseries.return_value = TsCacheData([]) + + result = timeseries.process(mock_config, mock_session_manager) + + mock_process_timeseries.assert_called_once_with( + mock_config.timeseries, mock_config.start_time, mock_config.end_time, mock_session_manager + ) + assert isinstance(result, TsCacheData) + +def test_process_timeseries(mock_session_manager, mocker): + mocker.patch("location.process_locations") + mock_execute = mocker.patch("utils.threading_util.execute_tasks") + + # Mocking results for 3 calls to execute_tasks + # 1. Identifier retrieval + # 2. Identifier storage + # 3. Data retrieval + # 4. Data storage + mock_execute.side_effect = [[], [], [], ["success"]] + + ts_list = ["SWT.Loc.Flow.Inst.1Hour.0.Cda"] + begin = "2026-01-01" + end = "2026-01-02" + + result = timeseries.process_timeseries(ts_list, begin, end, mock_session_manager) + + assert mock_session_manager.use_source_session.called + assert mock_session_manager.use_dest_session.called + assert mock_execute.call_count == 4 + assert result.ts_data == ["success"] + +def test_process_timeseries_invalid_format(mock_session_manager, mocker): + mocker.patch("location.process_locations") + mocker.patch("utils.threading_util.execute_tasks", return_value=[]) + + ts_list = ["invalid.format"] + result = timeseries.process_timeseries(ts_list, "begin", "end", mock_session_manager) + assert result.ts_data == [] + +def test_retrieve_one_ts_identifier_cache(mocker): + mock_get_cache = mocker.patch("utils.cache_util.get_from_cache") + mock_get_cache.return_value = {"id": "CachedId"} + + ts_info = ["SWT", "Loc.Flow.Inst.1Hour.0.Cda", "begin", "end"] + result = timeseries._retrieve_one_ts_identifier(ts_info) + + assert result == {"id": "CachedId"} + mock_get_cache.assert_called_once_with("SWT", "Loc.Flow.Inst.1Hour.0.Cda", "id") + +def test_retrieve_one_ts_identifier_cwms(mocker): + mocker.patch("utils.cache_util.get_from_cache", return_value=None) + mock_put_cache = mocker.patch("utils.cache_util.put_in_cache") + mock_cwms_get = mocker.patch("cwms.get_timeseries_identifier") + + mock_response = MagicMock() + mock_response.json = {"id": "CwmsId"} + mock_cwms_get.return_value = mock_response + + ts_info = ["SWT", "Loc.Flow.Inst.1Hour.0.Cda", "begin", "end"] + result = timeseries._retrieve_one_ts_identifier(ts_info) + + assert result == {"id": "CwmsId"} + mock_cwms_get.assert_called_once_with("SWT", "Loc.Flow.Inst.1Hour.0.Cda") + mock_put_cache.assert_called_once_with({"id": "CwmsId"}, "SWT", "Loc.Flow.Inst.1Hour.0.Cda", "id") + +def test_retrieve_one_ts_data_cache(mocker): + mock_get_cache = mocker.patch("utils.cache_util.get_from_cache") + mock_get_cache.return_value = {"data": "CachedData"} + + ts_info = ["SWT", "Loc.Flow.Inst.1Hour.0.Cda", "begin", "end"] + result = timeseries._retrieve_one_ts_data(ts_info) + + assert result == {"data": "CachedData"} + mock_get_cache.assert_called_once_with("SWT", "Loc.Flow.Inst.1Hour.0.Cda", "begin", "end", "data") + +def test_retrieve_one_ts_data_cwms(mocker): + mocker.patch("utils.cache_util.get_from_cache", return_value=None) + mock_put_cache = mocker.patch("utils.cache_util.put_in_cache") + mock_cwms_get = mocker.patch("cwms.get_timeseries") + + mock_response = MagicMock() + mock_response.json = {"data": "CwmsData"} + mock_cwms_get.return_value = mock_response + + ts_info = ["SWT", "Loc.Flow.Inst.1Hour.0.Cda", "begin", "end"] + result = timeseries._retrieve_one_ts_data(ts_info) + + assert result == {"data": "CwmsData"} + mock_cwms_get.assert_called_once_with("Loc.Flow.Inst.1Hour.0.Cda", "SWT", begin="begin", end="end") + mock_put_cache.assert_called_once_with({"data": "CwmsData"}, "SWT", "Loc.Flow.Inst.1Hour.0.Cda", "begin", "end", "data") + +def test_store_one_ts_id(mocker): + mock_cwms_store = mocker.patch("cwms.store_timeseries_identifier") + data = {"id": "TestId"} + result = timeseries._store_one_ts_id(data) + assert result == data + mock_cwms_store.assert_called_once_with(data) + +def test_store_one_ts_data(mocker): + mock_cwms_store = mocker.patch("cwms.store_timeseries") + data = {"data": "TestData"} + result = timeseries._store_one_ts_data(data) + assert result == data + mock_cwms_store.assert_called_once_with(data) diff --git a/cda-etl/tests/resources/SWT/Locations/EUFA-Dam.json b/cda-etl/tests/resources/SWT/Locations/EUFA-Dam.json new file mode 100644 index 000000000..ff006e794 --- /dev/null +++ b/cda-etl/tests/resources/SWT/Locations/EUFA-Dam.json @@ -0,0 +1,22 @@ +{ + "office-id": "SWT", + "name": "EUFA-Dam", + "latitude": 35.3070419, + "longitude": -95.3627509, + "active": true, + "public-name": "Dam", + "long-name": "Dam", + "timezone-name": "US/Central", + "location-kind": "EMBANKMENT", + "nation": "US", + "state-initial": "OK", + "county-name": "Haskell", + "nearest-city": "Hoyt, OK", + "horizontal-datum": "NAD83", + "published-longitude": 95.3625, + "published-latitude": 35.306944444444, + "vertical-datum": "NGVD29", + "elevation": 497.99868766404194, + "bounding-office-id": "SWT", + "elevation-units": "ft" +} \ No newline at end of file diff --git a/cda-etl/tests/resources/SWT/Locations/EUFA.json b/cda-etl/tests/resources/SWT/Locations/EUFA.json new file mode 100644 index 000000000..35c719afc --- /dev/null +++ b/cda-etl/tests/resources/SWT/Locations/EUFA.json @@ -0,0 +1,24 @@ +{ + "office-id": "SWT", + "name": "EUFA", + "latitude": 35.3069444, + "longitude": -95.3625, + "active": true, + "public-name": "Eufaula Lake", + "long-name": "Eufaula Lake near Brooken, OK", + "description": "Eufaula Lake near Brooken, OK", + "timezone-name": "US/Central", + "location-type": "RESERVOIR", + "location-kind": "PROJECT", + "nation": "US", + "state-initial": "OK", + "county-name": "Haskell", + "nearest-city": "Hoyt, OK", + "horizontal-datum": "WGS84", + "published-longitude": 95.3625, + "published-latitude": 35.306944444444, + "vertical-datum": "NGVD29", + "elevation": 497.99868766404194, + "bounding-office-id": "SWT", + "elevation-units": "ft" +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index b4aa2658a..9ff3d9808 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,4 +15,5 @@ include ":cwms-data-api" include ":cda-gui" include ":docs" project(":docs").projectDir = file("docs") -include ":cda-client" \ No newline at end of file +include ":cda-client" +include ":cda-etl" \ No newline at end of file