From 571ef1117e3fe3648ad6214c7908c4adc79c2fd7 Mon Sep 17 00:00:00 2001 From: Ryan Miles Date: Thu, 14 May 2026 12:01:59 -0700 Subject: [PATCH 1/3] Add CDA-ETL module with initial implementation and basic functionality Includes the addition of the following: - Dockerfile and docker-compose for containerizing the service. - Gradle build configuration for the module. - Core ETL pipeline components: configuration, session management, location, project, and timeseries processing. - Environment variable management with `etl.env.example`. - Utility functions and cache handling. - Initial set of unit tests for basic validation (e.g., configuration handling). --- .gitignore | 5 +- cda-etl/Dockerfile | 10 ++++ cda-etl/build.gradle | 68 +++++++++++++++++++++ cda-etl/docker-compose.yml | 14 +++++ cda-etl/etl.env.example | 33 +++++++++++ cda-etl/requirements.txt | 1 + cda-etl/src/cda_etl/__init__.py | 17 ++++++ cda-etl/src/cda_etl/config.py | 79 +++++++++++++++++++++++++ cda-etl/src/cda_etl/location.py | 78 ++++++++++++++++++++++++ cda-etl/src/cda_etl/main.py | 66 +++++++++++++++++++++ cda-etl/src/cda_etl/project.py | 28 +++++++++ cda-etl/src/cda_etl/session_manager.py | 35 +++++++++++ cda-etl/src/cda_etl/timeseries.py | 22 +++++++ cda-etl/src/cda_etl/utils/__init__.py | 17 ++++++ cda-etl/src/cda_etl/utils/cache_util.py | 59 ++++++++++++++++++ cda-etl/tests/cda_etl/__init__.py | 17 ++++++ cda-etl/tests/cda_etl/verify_config.py | 52 ++++++++++++++++ settings.gradle | 3 +- 18 files changed, 602 insertions(+), 2 deletions(-) create mode 100644 cda-etl/Dockerfile create mode 100644 cda-etl/build.gradle create mode 100644 cda-etl/docker-compose.yml create mode 100644 cda-etl/etl.env.example create mode 100644 cda-etl/requirements.txt create mode 100644 cda-etl/src/cda_etl/__init__.py create mode 100644 cda-etl/src/cda_etl/config.py create mode 100644 cda-etl/src/cda_etl/location.py create mode 100644 cda-etl/src/cda_etl/main.py create mode 100644 cda-etl/src/cda_etl/project.py create mode 100644 cda-etl/src/cda_etl/session_manager.py create mode 100644 cda-etl/src/cda_etl/timeseries.py create mode 100644 cda-etl/src/cda_etl/utils/__init__.py create mode 100644 cda-etl/src/cda_etl/utils/cache_util.py create mode 100644 cda-etl/tests/cda_etl/__init__.py create mode 100644 cda-etl/tests/cda_etl/verify_config.py diff --git a/.gitignore b/.gitignore index 308de2a0cb..90cd81cf07 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,9 @@ gradle.properties kdata pki/certs compose.env +etl.env .vscode **/node_modules/ -cwms-data-api/features.properties \ No newline at end of file +cwms-data-api/features.properties +cda-etl/logs +cda-etl/cache \ No newline at end of file diff --git a/cda-etl/Dockerfile b/cda-etl/Dockerfile new file mode 100644 index 0000000000..eef22e2001 --- /dev/null +++ b/cda-etl/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.14-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY src/ /app/src/ + +ENTRYPOINT ["python", "src/cda_etl/main.py"] diff --git a/cda-etl/build.gradle b/cda-etl/build.gradle new file mode 100644 index 0000000000..6f987b1df8 --- /dev/null +++ b/cda-etl/build.gradle @@ -0,0 +1,68 @@ +/* + * MIT License + * Copyright (c) 2026 Hydrologic Engineering Center + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +plugins { + id 'base' +} + +def isWindows = { + System.getProperty('os.name').toLowerCase().contains('win') +} + +final def pythonCmd = isWindows() ? 'python' : 'python3' +final def mainScript = 'src/cda_etl/main.py' +final def envFile = 'etl.env' +final def reqFile = 'requirements.txt' + +tasks.register('installRequirements', Exec) { + commandLine pythonCmd, '-m', 'pip', 'install', '-r', reqFile + workingDir projectDir + inputs.file(new File(projectDir, reqFile)) + outputs.file(new File(buildDir, 'pip-install.marker')) + doLast { + mkdir buildDir + new File(buildDir, 'pip-install.marker').text = "installed at ${new Date()}\n" + } +} + +tasks.register('runEtl', Exec) { + dependsOn 'installRequirements' + group 'application' + executable pythonCmd + args mainScript + workingDir projectDir + + // Load environment variables from etl.env + doFirst { + def envFileObj = new File(projectDir, envFile) + if (envFileObj.exists()) { + envFileObj.eachLine { line -> + if (line.trim() && !line.startsWith('#')) { + def parts = line.split('=', 2) + if (parts.length == 2) { + environment parts[0].trim(), parts[1].trim() + } + } + } + } else { + logger.warn("Environment file ${envFile} not found.") + } + } +} diff --git a/cda-etl/docker-compose.yml b/cda-etl/docker-compose.yml new file mode 100644 index 0000000000..cffd17bfa6 --- /dev/null +++ b/cda-etl/docker-compose.yml @@ -0,0 +1,14 @@ +services: + cda-etl: + build: . + volumes: + - ./cache:/app/cache + - ./logs:/app/logs + environment: + -SOURCE_CDA_URL: + -SOURCE_CDA_API_KEY: + -DEST_CDA_URL: + -DEST_CDA_API_KEY: + -START_TIME: + -END_TIME: + -LOOKBACK: diff --git a/cda-etl/etl.env.example b/cda-etl/etl.env.example new file mode 100644 index 0000000000..a2b52675af --- /dev/null +++ b/cda-etl/etl.env.example @@ -0,0 +1,33 @@ +# +# MIT License +# Copyright (c) 2025 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + +SOURCE_CDA_URL=https://cwms-data.usace.army.mil/cwms-data/ +SOURCE_CDA_API_KEY= +DEST_CDA_URL=http://localhost:7000/cwms-data +DEST_CDA_API_KEY= +START_TIME=2025-01-01 +END_TIME=2026-01-01 +MAX_THREADS=10 +LOG_LEVEL=INFO + +# Data retrieval +LOCATIONS=SWT.EUFA-Dam +PROJECTS=SWT.EUFA +TIMESERIES=EUFA.Elev.Inst.1Hour.0.Ccp-Rev \ No newline at end of file diff --git a/cda-etl/requirements.txt b/cda-etl/requirements.txt new file mode 100644 index 0000000000..420558645b --- /dev/null +++ b/cda-etl/requirements.txt @@ -0,0 +1 @@ +cwms-python \ No newline at end of file diff --git a/cda-etl/src/cda_etl/__init__.py b/cda-etl/src/cda_etl/__init__.py new file mode 100644 index 0000000000..11cc650a88 --- /dev/null +++ b/cda-etl/src/cda_etl/__init__.py @@ -0,0 +1,17 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. diff --git a/cda-etl/src/cda_etl/config.py b/cda-etl/src/cda_etl/config.py new file mode 100644 index 0000000000..43c74a006d --- /dev/null +++ b/cda-etl/src/cda_etl/config.py @@ -0,0 +1,79 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import os +import logging +from datetime import datetime + +logger = logging.getLogger(__name__) +DATE_TIME_FORMAT = "%Y-%m-%d" + + +class Config: + source_cda_url: str + source_cda_api_key: str + dest_cda_url: str + dest_cda_api_key: str + start_time: datetime + end_time: datetime + max_threads: int + locations: list[str] + projects: list[str] + timeseries: list[str] + + def __init__(self): + self.source_cda_url = os.getenv("SOURCE_CDA_URL") + self.source_cda_api_key = os.getenv("SOURCE_CDA_API_KEY") + self.dest_cda_url = os.getenv("DEST_CDA_URL") + self.dest_cda_api_key = os.getenv("DEST_CDA_API_KEY") + start_time = os.getenv("START_TIME") + end_time = os.getenv("END_TIME") + max_threads = os.getenv("MAX_THREADS", "1") + self.locations = os.getenv("LOCATIONS", "").split(",") + self.projects = os.getenv("PROJECTS", "").split(",") + self.timeseries = os.getenv("TIMESERIES", "").split(",") + + if not self.source_cda_url or not self.dest_cda_url: + raise ValueError("Missing required environment variables for CDA ETL configuration") + + if not self.source_cda_api_key: + logger.warning("Missing SOURCE_CDA_API_KEY environment variable.") + + if not self.dest_cda_api_key: + logger.warning("Missing DEST_CDA_API_KEY environment variable.") + + + if not start_time and not end_time: + raise ValueError("Must set both START_TIME and END_TIME in the format of %Y-%m-%d. Example:\n" + "START_TIME=2023-01-01\n" + "END_TIME=2023-01-31") + + try: + self.start_time = datetime.strptime(start_time, DATE_TIME_FORMAT) + except: + raise ValueError("START_TIME must be in the format of %Y-%m-%d") + try: + self.end_time = datetime.strptime(end_time, DATE_TIME_FORMAT) + except: + raise ValueError("END_TIME must be in the format of %Y-%m-%d") + + try: + self.max_threads = int(max_threads) + except ValueError: + raise ValueError("MAX_THREADS must be a number") + + logger.info(f"Configuration read: {str(self)}") \ No newline at end of file diff --git a/cda-etl/src/cda_etl/location.py b/cda-etl/src/cda_etl/location.py new file mode 100644 index 0000000000..272826a098 --- /dev/null +++ b/cda-etl/src/cda_etl/location.py @@ -0,0 +1,78 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +from dataclasses import dataclass +from concurrent.futures import ThreadPoolExecutor, as_completed +import logging +import cwms +import traceback +import utils.cache_util as util + +logger = logging.getLogger(__name__) + +@dataclass +class LocationData: + location_ids: list[str] + + +def process(config, session_manager): + return process_locations(config.locations, config.max_threads, session_manager) + + +def process_locations(locations, max_threads, session_manager): + session_manager.use_source_session() + + # Retrieval + results = [] + with ThreadPoolExecutor(max_workers=max_threads) as executor: + future_to_location = { + executor.submit(retrieve_one_location, location): location + for location in locations + } + + for future in as_completed(future_to_location): + location, id = future_to_location[future] + try: + result = future.result() + if result: + results.append(result) + else: + logger.warning(f"Location {id} not found") + except Exception as e: + logger.warning(f"Exception while retrieving location {id}: {e}") + traceback.print_exc() + + return LocationData(results) + + +def retrieve_one_location(location): + # Split out office id based on dot notation + splits = location.split(".") + + if len(splits) != 2: + logger.warning(f"Invalid location format: {location}\nExpected [officeid].[locationid]") + return None + + office_id = splits[0] + location_id = splits[1] + + util.get_from_cache(office_id, location_id) + + return cwms.get_location(location_id, office_id).json, location + + +def store_one_location(location_data, location_id): diff --git a/cda-etl/src/cda_etl/main.py b/cda-etl/src/cda_etl/main.py new file mode 100644 index 0000000000..38c94faa5b --- /dev/null +++ b/cda-etl/src/cda_etl/main.py @@ -0,0 +1,66 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import traceback +import sys +import logging +import os +import location +import project +import timeseries +from datetime import datetime +from config import Config +from session_manager import SessionManager + +logger = logging.getLogger(__name__) + +def pipeline(config, session_manager): + location_data = location.process(config, session_manager) + project_data = project.process(config, session_manager, location_data) + timeseries.process(config, session_manager, location_data) + + +def init(): + config = Config() + session_manager = SessionManager(config) + return config, session_manager + + +if __name__ == "__main__": + now = datetime.now() + log_level_str = os.getenv("LOG_LEVEL", "INFO").upper() + log_level = getattr(logging, log_level_str, logging.INFO) + + logging.basicConfig(filename=f'logs/cda-etl-{now.strftime("%Y%m%d%H%M%S")}.log', level=log_level) + + logger.debug(f"Using log level: {log_level_str}") + + try: + config, session_manager = init() + + try: + pipeline(config, session_manager) + except Exception as e: + logger.error(f"Unhandled exception occurred during ETL pipeline execution: {e}") + traceback.print_exc() + sys.exit(1) + + except Exception as e: + logger.error(f"Unhandled exception occurred during initialization: {e}") + traceback.print_exc() + sys.exit(1) + diff --git a/cda-etl/src/cda_etl/project.py b/cda-etl/src/cda_etl/project.py new file mode 100644 index 0000000000..7486f55184 --- /dev/null +++ b/cda-etl/src/cda_etl/project.py @@ -0,0 +1,28 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +from dataclasses import dataclass +import logging +logger = logging.getLogger(__name__) + +@dataclass +class ProjectData: + project_ids: list[str] + +def process(config, session_manager, location_data): + return ProjectData([]) + diff --git a/cda-etl/src/cda_etl/session_manager.py b/cda-etl/src/cda_etl/session_manager.py new file mode 100644 index 0000000000..72d8298155 --- /dev/null +++ b/cda-etl/src/cda_etl/session_manager.py @@ -0,0 +1,35 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import cwms + +class SessionManager: + source_session = None + dest_session = None + + def __init__(self, config): + self.dest_session = cwms.init_session(api_root=config.dest_cda_url, api_key=config.dest_cda_api_key) + self.source_session = cwms.init_session(api_root=config.source_cda_url, api_key=config.source_cda_api_key) + + def close(self): + pass + + def use_source_session(self): + cwms.api.SESSION = self.source_session + + def use_dest_session(self): + cwms.api.SESSION = self.dest_session \ No newline at end of file diff --git a/cda-etl/src/cda_etl/timeseries.py b/cda-etl/src/cda_etl/timeseries.py new file mode 100644 index 0000000000..dbae4590cd --- /dev/null +++ b/cda-etl/src/cda_etl/timeseries.py @@ -0,0 +1,22 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import logging +logger = logging.getLogger(__name__) + +def process(config, session_manager, location_data): + pass diff --git a/cda-etl/src/cda_etl/utils/__init__.py b/cda-etl/src/cda_etl/utils/__init__.py new file mode 100644 index 0000000000..11cc650a88 --- /dev/null +++ b/cda-etl/src/cda_etl/utils/__init__.py @@ -0,0 +1,17 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. diff --git a/cda-etl/src/cda_etl/utils/cache_util.py b/cda-etl/src/cda_etl/utils/cache_util.py new file mode 100644 index 0000000000..8d4034b986 --- /dev/null +++ b/cda-etl/src/cda_etl/utils/cache_util.py @@ -0,0 +1,59 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import os +import json + +__folder = "./cache" + +def get_from_cache(*args): + """ + :param args: Path components for cache file + :return: Cached json data or None if not found + """ + path = _get_cache_path(*args) + if not path: + return None + + if not os.path.exists(path): + return None + + with open(path, 'r') as f: + return json.load(f) + + +def _get_cache_path(*args): + ''' + :param args: Path components for cache file + :return: Full path to cache file or None if the file doesn't exist + ''' + if not args: + return None + + # Convert args to list and ensure last element has .json extension + path_parts = list(args) + if not path_parts[-1].endswith('.json'): + path_parts[-1] = path_parts[-1] + '.json' + + # Build full path starting with __folder + full_path = os.path.join(__folder, *path_parts) + return full_path + + +def put_in_cache(value, *args): + pass \ No newline at end of file diff --git a/cda-etl/tests/cda_etl/__init__.py b/cda-etl/tests/cda_etl/__init__.py new file mode 100644 index 0000000000..11cc650a88 --- /dev/null +++ b/cda-etl/tests/cda_etl/__init__.py @@ -0,0 +1,17 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. diff --git a/cda-etl/tests/cda_etl/verify_config.py b/cda-etl/tests/cda_etl/verify_config.py new file mode 100644 index 0000000000..f013df9140 --- /dev/null +++ b/cda-etl/tests/cda_etl/verify_config.py @@ -0,0 +1,52 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import os +import sys +from datetime import datetime +from cda_etl.config import Config + +def test_config(): + os.environ["SOURCE_CDA_URL"] = "http://source" + os.environ["DEST_CDA_URL"] = "http://dest" + os.environ["START_TIME"] = "2023-01-01" + os.environ["END_TIME"] = "2023-01-31" + + # Test valid MAX_THREADS + os.environ["MAX_THREADS"] = "5" + config = Config() + assert config.max_threads == 5 + print("Test valid MAX_THREADS passed") + + # Test default MAX_THREADS + del os.environ["MAX_THREADS"] + config = Config() + assert config.max_threads == 1 + print("Test default MAX_THREADS passed") + + # Test invalid MAX_THREADS + os.environ["MAX_THREADS"] = "abc" + try: + Config() + print("Test invalid MAX_THREADS failed (no exception raised)") + except ValueError as e: + assert str(e) == "MAX_THREADS must be a number" + print("Test invalid MAX_THREADS passed (exception raised)") + +if __name__ == "__main__": + test_config() diff --git a/settings.gradle b/settings.gradle index b4aa2658a6..9ff3d9808a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,4 +15,5 @@ include ":cwms-data-api" include ":cda-gui" include ":docs" project(":docs").projectDir = file("docs") -include ":cda-client" \ No newline at end of file +include ":cda-client" +include ":cda-etl" \ No newline at end of file From d4f735bb1a14d6d28d97f457a2683796a4c24014 Mon Sep 17 00:00:00 2001 From: Ryan Miles Date: Fri, 15 May 2026 20:03:19 -0700 Subject: [PATCH 2/3] Refactor and enhance CDA-ETL pipeline functionality - Split core functionality into modular components for improved clarity and maintainability, including separate processing for locations, projects, and timeseries. - Introduced caching logic in `cache_util.py` for optimized data retrieval and storage. - Added threading utilities for concurrent task execution in `threading_util.py`. - Enhanced `SessionManager` logic with dynamic session initialization. - Updated Gradle build to include a `runEtlUnitTests` task for streamlined testing. - Improved environment variable examples in `etl.env.example`. - Introduced comprehensive unit tests for locations, projects, and timeseries modules. - Various bug fixes and restructured imports for consistency. --- cda-etl/build.gradle | 9 ++ cda-etl/etl.env.example | 5 +- cda-etl/requirements.txt | 5 +- cda-etl/src/cda_etl/location.py | 52 +++---- cda-etl/src/cda_etl/main.py | 6 +- cda-etl/src/cda_etl/project.py | 50 ++++++- cda-etl/src/cda_etl/session_manager.py | 16 +- cda-etl/src/cda_etl/timeseries.py | 101 ++++++++++++- cda-etl/src/cda_etl/utils/cache_util.py | 8 +- cda-etl/src/cda_etl/utils/threading_util.py | 49 ++++++ .../{verify_config.py => test_config.py} | 4 +- cda-etl/tests/cda_etl/test_location.py | 98 ++++++++++++ cda-etl/tests/cda_etl/test_project.py | 96 ++++++++++++ cda-etl/tests/cda_etl/test_timeseries.py | 140 ++++++++++++++++++ 14 files changed, 589 insertions(+), 50 deletions(-) create mode 100644 cda-etl/src/cda_etl/utils/threading_util.py rename cda-etl/tests/cda_etl/{verify_config.py => test_config.py} (96%) create mode 100644 cda-etl/tests/cda_etl/test_location.py create mode 100644 cda-etl/tests/cda_etl/test_project.py create mode 100644 cda-etl/tests/cda_etl/test_timeseries.py diff --git a/cda-etl/build.gradle b/cda-etl/build.gradle index 6f987b1df8..00ae286b3d 100644 --- a/cda-etl/build.gradle +++ b/cda-etl/build.gradle @@ -66,3 +66,12 @@ tasks.register('runEtl', Exec) { } } } + +tasks.register('runEtlUnitTests', Exec) { + dependsOn 'installRequirements' + group 'verification' + executable pythonCmd + args '-m', 'pytest' + workingDir projectDir + environment 'PYTHONPATH', 'src/cda_etl' +} diff --git a/cda-etl/etl.env.example b/cda-etl/etl.env.example index a2b52675af..012936e41b 100644 --- a/cda-etl/etl.env.example +++ b/cda-etl/etl.env.example @@ -18,16 +18,19 @@ # SOFTWARE. # +# Required settings SOURCE_CDA_URL=https://cwms-data.usace.army.mil/cwms-data/ SOURCE_CDA_API_KEY= DEST_CDA_URL=http://localhost:7000/cwms-data DEST_CDA_API_KEY= START_TIME=2025-01-01 END_TIME=2026-01-01 + +# Optional settings MAX_THREADS=10 LOG_LEVEL=INFO # Data retrieval LOCATIONS=SWT.EUFA-Dam PROJECTS=SWT.EUFA -TIMESERIES=EUFA.Elev.Inst.1Hour.0.Ccp-Rev \ No newline at end of file +TIMESERIES=SWT.EUFA.Elev.Inst.1Hour.0.Ccp-Rev \ No newline at end of file diff --git a/cda-etl/requirements.txt b/cda-etl/requirements.txt index 420558645b..008f5b3399 100644 --- a/cda-etl/requirements.txt +++ b/cda-etl/requirements.txt @@ -1 +1,4 @@ -cwms-python \ No newline at end of file +cwms-python +pytest +pytest-mock +pytest-cov \ No newline at end of file diff --git a/cda-etl/src/cda_etl/location.py b/cda-etl/src/cda_etl/location.py index 272826a098..b5019e3dbd 100644 --- a/cda-etl/src/cda_etl/location.py +++ b/cda-etl/src/cda_etl/location.py @@ -16,11 +16,11 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. from dataclasses import dataclass -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor import logging import cwms -import traceback -import utils.cache_util as util +import utils.cache_util as cache_util +import utils.threading_util as threading_util logger = logging.getLogger(__name__) @@ -30,36 +30,24 @@ class LocationData: def process(config, session_manager): - return process_locations(config.locations, config.max_threads, session_manager) + return process_locations(config.locations, session_manager) -def process_locations(locations, max_threads, session_manager): +def process_locations(locations, session_manager): + # Retrieval session_manager.use_source_session() + retrieval_results = threading_util.execute_tasks(_retrieve_one_location, locations) - # Retrieval - results = [] - with ThreadPoolExecutor(max_workers=max_threads) as executor: - future_to_location = { - executor.submit(retrieve_one_location, location): location - for location in locations - } - - for future in as_completed(future_to_location): - location, id = future_to_location[future] - try: - result = future.result() - if result: - results.append(result) - else: - logger.warning(f"Location {id} not found") - except Exception as e: - logger.warning(f"Exception while retrieving location {id}: {e}") - traceback.print_exc() + # Storage + session_manager.use_dest_session() + storage_data = threading_util.execute_tasks(_store_one_location, retrieval_results) + + results = storage_data return LocationData(results) -def retrieve_one_location(location): +def _retrieve_one_location(location): # Split out office id based on dot notation splits = location.split(".") @@ -70,9 +58,15 @@ def retrieve_one_location(location): office_id = splits[0] location_id = splits[1] - util.get_from_cache(office_id, location_id) - - return cwms.get_location(location_id, office_id).json, location + cache_data = cache_util.get_from_cache(office_id, location_id) + if cache_data: + return cache_data + else: + location_data = cwms.get_location(location_id, office_id).json + cache_util.put_in_cache(location_data, office_id, location_id) + return location_data -def store_one_location(location_data, location_id): +def _store_one_location(location_data): + cwms.store_location(location_data) + return location_data diff --git a/cda-etl/src/cda_etl/main.py b/cda-etl/src/cda_etl/main.py index 38c94faa5b..7137a32bbc 100644 --- a/cda-etl/src/cda_etl/main.py +++ b/cda-etl/src/cda_etl/main.py @@ -22,6 +22,7 @@ import location import project import timeseries +import utils.threading_util from datetime import datetime from config import Config from session_manager import SessionManager @@ -30,13 +31,14 @@ def pipeline(config, session_manager): location_data = location.process(config, session_manager) - project_data = project.process(config, session_manager, location_data) - timeseries.process(config, session_manager, location_data) + project_data = project.process(config, session_manager) + timeseries.process(config, session_manager) def init(): config = Config() session_manager = SessionManager(config) + utils.threading_util.init_executor(config.max_threads) return config, session_manager diff --git a/cda-etl/src/cda_etl/project.py b/cda-etl/src/cda_etl/project.py index 7486f55184..88b58a9b05 100644 --- a/cda-etl/src/cda_etl/project.py +++ b/cda-etl/src/cda_etl/project.py @@ -17,12 +17,58 @@ # SOFTWARE. from dataclasses import dataclass import logging +import utils.threading_util as threading_util +import utils.cache_util as cache_util +import location +import cwms logger = logging.getLogger(__name__) @dataclass class ProjectData: project_ids: list[str] -def process(config, session_manager, location_data): - return ProjectData([]) +def process(config, session_manager): + return process_projects(config.projects, session_manager) + + +def process_projects(projects, session_manager): + # Make sure we have project locations downloaded + location.process_locations(projects, session_manager) + + # Retrieval + session_manager.use_source_session() + retrieval_results = threading_util.execute_tasks(_retrieve_one_project, projects) + + # Storage + session_manager.use_dest_session() + storage_data = threading_util.execute_tasks(_store_one_project, retrieval_results) + + results = storage_data + + return ProjectData(results) + + +def _retrieve_one_project(project): + # Split out office id based on dot notation + splits = project.split(".") + + if len(splits) != 2: + logger.warning(f"Invalid location format: {project}\nExpected [officeid].[locationid]") + return None + + office_id = splits[0] + project_id = splits[1] + + cache_data = cache_util.get_from_cache(office_id, project_id) + if cache_data: + return cache_data + else: + project_data = cwms.get_project(office_id, project_id).json + cache_util.put_in_cache(project_data, office_id, project_id) + return project_data + + +def _store_one_project(project_data): + cwms.store_project(project_data) + return project_data diff --git a/cda-etl/src/cda_etl/session_manager.py b/cda-etl/src/cda_etl/session_manager.py index 72d8298155..2d739567f0 100644 --- a/cda-etl/src/cda_etl/session_manager.py +++ b/cda-etl/src/cda_etl/session_manager.py @@ -17,19 +17,17 @@ # SOFTWARE. import cwms +from config import Config + + class SessionManager: - source_session = None - dest_session = None + config: Config def __init__(self, config): - self.dest_session = cwms.init_session(api_root=config.dest_cda_url, api_key=config.dest_cda_api_key) - self.source_session = cwms.init_session(api_root=config.source_cda_url, api_key=config.source_cda_api_key) - - def close(self): - pass + self.config = config def use_source_session(self): - cwms.api.SESSION = self.source_session + cwms.init_session(api_root=self.config.source_cda_url, api_key=self.config.source_cda_api_key) def use_dest_session(self): - cwms.api.SESSION = self.dest_session \ No newline at end of file + cwms.init_session(api_root=self.config.dest_cda_url, api_key=self.config.dest_cda_api_key) \ No newline at end of file diff --git a/cda-etl/src/cda_etl/timeseries.py b/cda-etl/src/cda_etl/timeseries.py index dbae4590cd..7ea79fe14a 100644 --- a/cda-etl/src/cda_etl/timeseries.py +++ b/cda-etl/src/cda_etl/timeseries.py @@ -16,7 +16,104 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import logging +import location +import utils.threading_util as threading_util +import utils.cache_util as cache_util +import cwms + logger = logging.getLogger(__name__) -def process(config, session_manager, location_data): - pass +class TsRetrievalData: + office_id: str + ts_id: str + + def __init__(self, office_id, ts_id, begin, end): + self.office_id = office_id + self.ts_id = ts_id + self.begin = begin + self.end = end + +class TsCacheData: + + def __init__(self, ts_data): + self.ts_data = ts_data + + +def process(config, session_manager): + return process_timeseries(config.timeseries, config.start_time, config.end_time, session_manager) + + +def process_timeseries(timeseries, begin, end, session_manager): + + invalid_ts = [] + ts_ids_to_split = {} + for ts in timeseries: + splits = ts.split(".") + if len(splits) != 7: + invalid_ts.append(ts) + else: + ts_ids_to_split[f"{splits[1]}.{splits[2]}.{splits[3]}.{splits[4]}.{splits[5]}.{splits[6]}"] = splits + + locations_to_retrieve = [] + ts_info = [] + for id, splits in ts_ids_to_split.items(): + locations_to_retrieve.append(f"{splits[0]}.{splits[1]}") + ts_info.append([splits[0], id, begin, end]) + + # Make sure we have project locations downloaded + location.process_locations(locations_to_retrieve, session_manager) + + # Retrieval of Identifier + session_manager.use_source_session() + retrieval_results = threading_util.execute_tasks(_retrieve_one_ts_identifier, ts_info) + + # Storage of Identifier + session_manager.use_dest_session() + threading_util.execute_tasks(_store_one_ts_id, retrieval_results) + + # Retrieval of Data + session_manager.use_source_session() + retrieval_results = threading_util.execute_tasks(_retrieve_one_ts_data, ts_info) + + # Storage of Data + session_manager.use_dest_session() + results = threading_util.execute_tasks(_store_one_ts_id, retrieval_results) + + return TsCacheData(results) + + +def _retrieve_one_ts_identifier(ts_info): + office_id = ts_info[0] + ts_id = ts_info[1] + + cache_data = cache_util.get_from_cache(office_id, ts_id, "id") + if cache_data: + return cache_data + else: + data = cwms.get_timeseries_identifier(office_id, ts_id).json + cache_util.put_in_cache(data, office_id, ts_id, "id") + return data + + +def _retrieve_one_ts_data(ts_info): + office_id = ts_info[0] + ts_id = ts_info[1] + begin = ts_info[2] + end = ts_info[3] + + cache_data = cache_util.get_from_cache(office_id, ts_id, begin, end, "data") + if cache_data: + return cache_data + else: + data = cwms.get_timeseries(ts_id, office_id, begin=begin, end=end).json + cache_util.put_in_cache(data, office_id, ts_id, begin, end, "data") + return data + + +def _store_one_ts_id(ts_id_data): + cwms.store_timeseries_identifier(ts_id_data) + return ts_id_data + +def _store_one_ts_data(ts_data): + cwms.store_timeseries(ts_data) + return ts_data diff --git a/cda-etl/src/cda_etl/utils/cache_util.py b/cda-etl/src/cda_etl/utils/cache_util.py index 8d4034b986..b60ae9d512 100644 --- a/cda-etl/src/cda_etl/utils/cache_util.py +++ b/cda-etl/src/cda_etl/utils/cache_util.py @@ -56,4 +56,10 @@ def _get_cache_path(*args): def put_in_cache(value, *args): - pass \ No newline at end of file + path = _get_cache_path(*args) + + if not os.path.exists(path): + os.makedirs(os.path.dirname(path), exist_ok=True) + + with open(path, "w", encoding="utf-8") as file: + json.dump(value, file, indent=2) \ No newline at end of file diff --git a/cda-etl/src/cda_etl/utils/threading_util.py b/cda-etl/src/cda_etl/utils/threading_util.py new file mode 100644 index 0000000000..d84f29e26a --- /dev/null +++ b/cda-etl/src/cda_etl/utils/threading_util.py @@ -0,0 +1,49 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import logging +import traceback +from concurrent.futures import as_completed, ThreadPoolExecutor + +logger = logging.getLogger(__name__) + +EXECUTOR: ThreadPoolExecutor + +def init_executor(max_workers): + global EXECUTOR + EXECUTOR = ThreadPoolExecutor(max_workers=max_workers) + + +def execute_tasks(task_func, items): + """ + Executes a task function for each item in a list using the provided executor. + Returns a dictionary mapping futures to items. + """ + futures_to_items = { + EXECUTOR.submit(task_func, item): item + for item in items + } + + results = [] + for future in as_completed(futures_to_items): + item = futures_to_items[future] + if future.exception(): + logger.warning(f"Exception occurred for {item}: {future.exception()}") + elif future.result(): + result = future.result() + results.append([item, result]) + return results diff --git a/cda-etl/tests/cda_etl/verify_config.py b/cda-etl/tests/cda_etl/test_config.py similarity index 96% rename from cda-etl/tests/cda_etl/verify_config.py rename to cda-etl/tests/cda_etl/test_config.py index f013df9140..cdad0edcd8 100644 --- a/cda-etl/tests/cda_etl/verify_config.py +++ b/cda-etl/tests/cda_etl/test_config.py @@ -19,7 +19,7 @@ import os import sys from datetime import datetime -from cda_etl.config import Config +from config import Config def test_config(): os.environ["SOURCE_CDA_URL"] = "http://source" @@ -48,5 +48,3 @@ def test_config(): assert str(e) == "MAX_THREADS must be a number" print("Test invalid MAX_THREADS passed (exception raised)") -if __name__ == "__main__": - test_config() diff --git a/cda-etl/tests/cda_etl/test_location.py b/cda-etl/tests/cda_etl/test_location.py new file mode 100644 index 0000000000..28c9548237 --- /dev/null +++ b/cda-etl/tests/cda_etl/test_location.py @@ -0,0 +1,98 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import pytest +from unittest.mock import MagicMock, patch +import location +from location import LocationData + +@pytest.fixture +def mock_config(): + config = MagicMock() + config.locations = ["SWT.TestLoc"] + return config + +@pytest.fixture +def mock_session_manager(): + return MagicMock() + +def test_process(mock_config, mock_session_manager, mocker): + mock_process_locations = mocker.patch("location.process_locations") + mock_process_locations.return_value = LocationData(["SWT.TestLoc"]) + + result = location.process(mock_config, mock_session_manager) + + mock_process_locations.assert_called_once_with(mock_config.locations, mock_session_manager) + assert result.location_ids == ["SWT.TestLoc"] + +def test_process_locations(mock_session_manager, mocker): + mock_execute = mocker.patch("utils.threading_util.execute_tasks") + + # Mock retrieval results: [[location_str, location_data], ...] + retrieval_results = [["SWT.TestLoc", {"name": "TestLoc"}]] + # Mock storage results: [[retrieval_result, storage_result], ...] + # where retrieval_result is ["SWT.TestLoc", {"name": "TestLoc"}] + storage_results = [[retrieval_results[0], {"name": "TestLoc"}]] + + mock_execute.side_effect = [retrieval_results, storage_results] + + locations = ["SWT.TestLoc"] + result = location.process_locations(locations, mock_session_manager) + + assert mock_session_manager.use_source_session.called + assert mock_session_manager.use_dest_session.called + assert len(mock_execute.call_args_list) == 2 + assert result.location_ids == storage_results + +def test_retrieve_one_location_invalid_format(mocker): + logger_spy = mocker.spy(location.logger, "warning") + result = location._retrieve_one_location("invalid_location") + assert result is None + assert logger_spy.called + +def test_retrieve_one_location_from_cache(mocker): + mock_get_cache = mocker.patch("utils.cache_util.get_from_cache") + mock_get_cache.return_value = {"name": "CachedLoc"} + + result = location._retrieve_one_location("SWT.CachedLoc") + + assert result == {"name": "CachedLoc"} + mock_get_cache.assert_called_once_with("SWT", "CachedLoc") + +def test_retrieve_one_location_from_cwms(mocker): + mocker.patch("utils.cache_util.get_from_cache", return_value=None) + mock_put_cache = mocker.patch("utils.cache_util.put_in_cache") + mock_cwms_get = mocker.patch("cwms.get_location") + + mock_response = MagicMock() + mock_response.json = {"name": "CwmsLoc"} + mock_cwms_get.return_value = mock_response + + result = location._retrieve_one_location("SWT.CwmsLoc") + + assert result == {"name": "CwmsLoc"} + mock_cwms_get.assert_called_once_with("CwmsLoc", "SWT") + mock_put_cache.assert_called_once_with({"name": "CwmsLoc"}, "SWT", "CwmsLoc") + +def test_store_one_location(mocker): + mock_cwms_store = mocker.patch("cwms.store_location") + location_data = {"name": "TestLoc"} + + result = location._store_one_location(location_data) + + assert result == location_data + mock_cwms_store.assert_called_once_with(location_data) diff --git a/cda-etl/tests/cda_etl/test_project.py b/cda-etl/tests/cda_etl/test_project.py new file mode 100644 index 0000000000..da7f8bf367 --- /dev/null +++ b/cda-etl/tests/cda_etl/test_project.py @@ -0,0 +1,96 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import pytest +from unittest.mock import MagicMock +import project +from project import ProjectData + +@pytest.fixture +def mock_config(): + config = MagicMock() + config.projects = ["SWT.TestProj"] + return config + +@pytest.fixture +def mock_session_manager(): + return MagicMock() + +def test_process(mock_config, mock_session_manager, mocker): + mock_process_projects = mocker.patch("project.process_projects") + mock_process_projects.return_value = ProjectData(["SWT.TestProj"]) + + result = project.process(mock_config, mock_session_manager) + + mock_process_projects.assert_called_once_with(mock_config.projects, mock_session_manager) + assert result.project_ids == ["SWT.TestProj"] + +def test_process_projects(mock_session_manager, mocker): + mocker.patch("location.process_locations") + mock_execute = mocker.patch("utils.threading_util.execute_tasks") + + retrieval_results = [["SWT.TestProj", {"name": "TestProj"}]] + storage_results = [[retrieval_results[0], {"name": "TestProj"}]] + + mock_execute.side_effect = [retrieval_results, storage_results] + + projects = ["SWT.TestProj"] + result = project.process_projects(projects, mock_session_manager) + + assert mock_session_manager.use_source_session.called + assert mock_session_manager.use_dest_session.called + assert len(mock_execute.call_args_list) == 2 + assert result.project_ids == storage_results + +def test_retrieve_one_project_invalid_format(mocker): + logger_spy = mocker.spy(project.logger, "warning") + result = project._retrieve_one_project("invalid_project") + assert result is None + assert logger_spy.called + +def test_retrieve_one_project_from_cache(mocker): + mock_get_cache = mocker.patch("utils.cache_util.get_from_cache") + mock_get_cache.return_value = {"name": "CachedProj"} + + result = project._retrieve_one_project("SWT.CachedProj") + + assert result == {"name": "CachedProj"} + mock_get_cache.assert_called_once_with("SWT", "CachedProj") + +def test_retrieve_one_project_from_cwms(mocker): + mocker.patch("utils.cache_util.get_from_cache", return_value=None) + mock_put_cache = mocker.patch("utils.cache_util.put_in_cache") + mock_cwms_get = mocker.patch("cwms.get_project") + + mock_response = MagicMock() + mock_response.json = {"name": "CwmsProj"} + mock_cwms_get.return_value = mock_response + + result = project._retrieve_one_project("SWT.CwmsProj") + + assert result == {"name": "CwmsProj"} + mock_cwms_get.assert_called_once_with("SWT", "CwmsProj") + mock_put_cache.assert_called_once_with({"name": "CwmsProj"}, "SWT", "CwmsProj") + +def test_store_one_project(mocker): + mock_cwms_store = mocker.patch("cwms.store_project") + project_data = {"name": "TestProj"} + + result = project._store_one_project(project_data) + + assert result == project_data + mock_cwms_store.assert_called_once_with(project_data) diff --git a/cda-etl/tests/cda_etl/test_timeseries.py b/cda-etl/tests/cda_etl/test_timeseries.py new file mode 100644 index 0000000000..b5bc43f234 --- /dev/null +++ b/cda-etl/tests/cda_etl/test_timeseries.py @@ -0,0 +1,140 @@ +# MIT License +# Copyright (c) 2026 Hydrologic Engineering Center +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import pytest +from unittest.mock import MagicMock +import timeseries +from timeseries import TsCacheData + +@pytest.fixture +def mock_config(): + config = MagicMock() + config.timeseries = ["SWT.TestLoc.Flow.Inst.1Hour.0.Cda"] + config.start_time = "2026-01-01T00:00:00" + config.end_time = "2026-01-02T00:00:00" + return config + +@pytest.fixture +def mock_session_manager(): + return MagicMock() + +def test_process(mock_config, mock_session_manager, mocker): + mock_process_timeseries = mocker.patch("timeseries.process_timeseries") + mock_process_timeseries.return_value = TsCacheData([]) + + result = timeseries.process(mock_config, mock_session_manager) + + mock_process_timeseries.assert_called_once_with( + mock_config.timeseries, mock_config.start_time, mock_config.end_time, mock_session_manager + ) + assert isinstance(result, TsCacheData) + +def test_process_timeseries(mock_session_manager, mocker): + mocker.patch("location.process_locations") + mock_execute = mocker.patch("utils.threading_util.execute_tasks") + + # Mocking results for 3 calls to execute_tasks + # 1. Identifier retrieval + # 2. Identifier storage + # 3. Data retrieval + # 4. Data storage + mock_execute.side_effect = [[], [], [], ["success"]] + + ts_list = ["SWT.Loc.Flow.Inst.1Hour.0.Cda"] + begin = "2026-01-01" + end = "2026-01-02" + + result = timeseries.process_timeseries(ts_list, begin, end, mock_session_manager) + + assert mock_session_manager.use_source_session.called + assert mock_session_manager.use_dest_session.called + assert mock_execute.call_count == 4 + assert result.ts_data == ["success"] + +def test_process_timeseries_invalid_format(mock_session_manager, mocker): + mocker.patch("location.process_locations") + mocker.patch("utils.threading_util.execute_tasks", return_value=[]) + + ts_list = ["invalid.format"] + result = timeseries.process_timeseries(ts_list, "begin", "end", mock_session_manager) + assert result.ts_data == [] + +def test_retrieve_one_ts_identifier_cache(mocker): + mock_get_cache = mocker.patch("utils.cache_util.get_from_cache") + mock_get_cache.return_value = {"id": "CachedId"} + + ts_info = ["SWT", "Loc.Flow.Inst.1Hour.0.Cda", "begin", "end"] + result = timeseries._retrieve_one_ts_identifier(ts_info) + + assert result == {"id": "CachedId"} + mock_get_cache.assert_called_once_with("SWT", "Loc.Flow.Inst.1Hour.0.Cda", "id") + +def test_retrieve_one_ts_identifier_cwms(mocker): + mocker.patch("utils.cache_util.get_from_cache", return_value=None) + mock_put_cache = mocker.patch("utils.cache_util.put_in_cache") + mock_cwms_get = mocker.patch("cwms.get_timeseries_identifier") + + mock_response = MagicMock() + mock_response.json = {"id": "CwmsId"} + mock_cwms_get.return_value = mock_response + + ts_info = ["SWT", "Loc.Flow.Inst.1Hour.0.Cda", "begin", "end"] + result = timeseries._retrieve_one_ts_identifier(ts_info) + + assert result == {"id": "CwmsId"} + mock_cwms_get.assert_called_once_with("SWT", "Loc.Flow.Inst.1Hour.0.Cda") + mock_put_cache.assert_called_once_with({"id": "CwmsId"}, "SWT", "Loc.Flow.Inst.1Hour.0.Cda", "id") + +def test_retrieve_one_ts_data_cache(mocker): + mock_get_cache = mocker.patch("utils.cache_util.get_from_cache") + mock_get_cache.return_value = {"data": "CachedData"} + + ts_info = ["SWT", "Loc.Flow.Inst.1Hour.0.Cda", "begin", "end"] + result = timeseries._retrieve_one_ts_data(ts_info) + + assert result == {"data": "CachedData"} + mock_get_cache.assert_called_once_with("SWT", "Loc.Flow.Inst.1Hour.0.Cda", "begin", "end", "data") + +def test_retrieve_one_ts_data_cwms(mocker): + mocker.patch("utils.cache_util.get_from_cache", return_value=None) + mock_put_cache = mocker.patch("utils.cache_util.put_in_cache") + mock_cwms_get = mocker.patch("cwms.get_timeseries") + + mock_response = MagicMock() + mock_response.json = {"data": "CwmsData"} + mock_cwms_get.return_value = mock_response + + ts_info = ["SWT", "Loc.Flow.Inst.1Hour.0.Cda", "begin", "end"] + result = timeseries._retrieve_one_ts_data(ts_info) + + assert result == {"data": "CwmsData"} + mock_cwms_get.assert_called_once_with("Loc.Flow.Inst.1Hour.0.Cda", "SWT", begin="begin", end="end") + mock_put_cache.assert_called_once_with({"data": "CwmsData"}, "SWT", "Loc.Flow.Inst.1Hour.0.Cda", "begin", "end", "data") + +def test_store_one_ts_id(mocker): + mock_cwms_store = mocker.patch("cwms.store_timeseries_identifier") + data = {"id": "TestId"} + result = timeseries._store_one_ts_id(data) + assert result == data + mock_cwms_store.assert_called_once_with(data) + +def test_store_one_ts_data(mocker): + mock_cwms_store = mocker.patch("cwms.store_timeseries") + data = {"data": "TestData"} + result = timeseries._store_one_ts_data(data) + assert result == data + mock_cwms_store.assert_called_once_with(data) From eab23d8c12a65d5216feb8ccd157822c89fe399c Mon Sep 17 00:00:00 2001 From: Ryan Miles Date: Wed, 20 May 2026 09:56:34 -0700 Subject: [PATCH 3/3] Refactor ETL pipeline to decouple processing and caching for locations, projects, and timeseries - Replaced inline processing calls with modular `cache_*` and `store_cached_*` workflows. - Added caching and validation logic for projects, locations, and timeseries. - Consolidated threading and retrieval utilities for improved reliability. - Updated example environment variables and tests for modular workflows. - Improved logging for debugging and transparency during execution. --- cda-etl/etl.env.example | 2 +- cda-etl/src/cda_etl/location.py | 76 ++++++---- cda-etl/src/cda_etl/main.py | 15 +- cda-etl/src/cda_etl/project.py | 69 +++++----- cda-etl/src/cda_etl/session_manager.py | 3 + cda-etl/src/cda_etl/timeseries.py | 130 +++++++++--------- cda-etl/src/cda_etl/utils/cache_util.py | 4 +- cda-etl/src/cda_etl/utils/threading_util.py | 5 +- cda-etl/tests/cda_etl/test_location.py | 2 +- cda-etl/tests/cda_etl/test_project.py | 2 +- .../resources/SWT/Locations/EUFA-Dam.json | 22 +++ .../tests/resources/SWT/Locations/EUFA.json | 24 ++++ 12 files changed, 220 insertions(+), 134 deletions(-) create mode 100644 cda-etl/tests/resources/SWT/Locations/EUFA-Dam.json create mode 100644 cda-etl/tests/resources/SWT/Locations/EUFA.json diff --git a/cda-etl/etl.env.example b/cda-etl/etl.env.example index 012936e41b..9ad467be90 100644 --- a/cda-etl/etl.env.example +++ b/cda-etl/etl.env.example @@ -19,7 +19,7 @@ # # Required settings -SOURCE_CDA_URL=https://cwms-data.usace.army.mil/cwms-data/ +SOURCE_CDA_URL=https://cwms-data-test.cwbi.us/cwms-data/ SOURCE_CDA_API_KEY= DEST_CDA_URL=http://localhost:7000/cwms-data DEST_CDA_API_KEY= diff --git a/cda-etl/src/cda_etl/location.py b/cda-etl/src/cda_etl/location.py index b5019e3dbd..b722141dab 100644 --- a/cda-etl/src/cda_etl/location.py +++ b/cda-etl/src/cda_etl/location.py @@ -15,7 +15,6 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -from dataclasses import dataclass from concurrent.futures import ThreadPoolExecutor import logging import cwms @@ -24,49 +23,66 @@ logger = logging.getLogger(__name__) -@dataclass -class LocationData: - location_ids: list[str] +def get_valid_locations(locations): + location_ids = [] + index = 0 + for location in locations: + splits = location.split(".") + if len(splits) != 2: + logger.warning(f"Invalid location at {index}: {location}\nExpected [officeid].[locationid]") + else: + logger.debug(f"Valid location found at {index}: {location}, splits: {splits}") + location_ids.append(splits) + index += 1 -def process(config, session_manager): - return process_locations(config.locations, session_manager) + if not location_ids: + logger.warning("No valid locations provided for processing") + return location_ids -def process_locations(locations, session_manager): - # Retrieval - session_manager.use_source_session() - retrieval_results = threading_util.execute_tasks(_retrieve_one_location, locations) +def cache_locations(locations): + # Validation + location_ids = get_valid_locations(locations) - # Storage - session_manager.use_dest_session() - storage_data = threading_util.execute_tasks(_store_one_location, retrieval_results) + if not location_ids: + logger.warning("No valid locations found for retrieving") + return - results = storage_data + # Retrieval + threading_util.execute_tasks(_retrieve_one_location, location_ids) - return LocationData(results) +def store_cached_locations(locations): + location_ids = get_valid_locations(locations) -def _retrieve_one_location(location): - # Split out office id based on dot notation - splits = location.split(".") + if not location_ids: + logger.warning("No valid locations found for retrieving") + return + + # Storage + threading_util.execute_tasks(_store_one_location, location_ids) - if len(splits) != 2: - logger.warning(f"Invalid location format: {location}\nExpected [officeid].[locationid]") - return None - office_id = splits[0] - location_id = splits[1] +def _retrieve_one_location(location): + office_id = location[0] + location_id = location[1] - cache_data = cache_util.get_from_cache(office_id, location_id) + logger.debug(f"Retrieving location data for office {office_id} and location {location_id}") + cache_data = cache_util.get_from_cache(office_id, "Locations", location_id) if cache_data: - return cache_data + logger.debug(f"Location data found in cache for office {office_id} and location {location_id}") else: + logger.debug(f"Location data not found in cache for office {office_id} and location {location_id}") location_data = cwms.get_location(location_id, office_id).json - cache_util.put_in_cache(location_data, office_id, location_id) - return location_data + cache_util.put_in_cache(location_data, office_id, "Locations", location_id) -def _store_one_location(location_data): - cwms.store_location(location_data) - return location_data +def _store_one_location(location): + office_id = location[0] + location_id = location[1] + location_data = cache_util.get_from_cache(office_id, "Locations", location_id) + if location_data: + cwms.store_location(location_data) + else: + logger.warning(f"Location data not found in cache for office {office_id} and location {location_id}") diff --git a/cda-etl/src/cda_etl/main.py b/cda-etl/src/cda_etl/main.py index 7137a32bbc..ddeb3d9e29 100644 --- a/cda-etl/src/cda_etl/main.py +++ b/cda-etl/src/cda_etl/main.py @@ -30,9 +30,18 @@ logger = logging.getLogger(__name__) def pipeline(config, session_manager): - location_data = location.process(config, session_manager) - project_data = project.process(config, session_manager) - timeseries.process(config, session_manager) + session_manager.use_source_session() + + # Read and cache data + location.cache_locations(config.locations) + project.cache_projects(config.projects) + timeseries.cache_timeseries(config.timeseries, config.start_time, config.end_time) + + session_manager.use_dest_session() + # Store cached data, so we're not keeping it all in memory + location.store_cached_locations(config.locations) + project.store_cached_projects(config.projects) + timeseries.store_cached_timeseries(config.timeseries, config.start_time, config.end_time) def init(): diff --git a/cda-etl/src/cda_etl/project.py b/cda-etl/src/cda_etl/project.py index 88b58a9b05..43681adfe0 100644 --- a/cda-etl/src/cda_etl/project.py +++ b/cda-etl/src/cda_etl/project.py @@ -15,7 +15,6 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -from dataclasses import dataclass import logging import utils.threading_util as threading_util import utils.cache_util as cache_util @@ -23,52 +22,60 @@ import cwms logger = logging.getLogger(__name__) -@dataclass -class ProjectData: - project_ids: list[str] -def process(config, session_manager): - return process_projects(config.projects, session_manager) +def cache_projects(projects): + # Make sure we have project locations downloaded + location.cache_locations(projects) + # Validation + project_ids = location.get_valid_locations(projects) -def process_projects(projects, session_manager): - # Make sure we have project locations downloaded - location.process_locations(projects, session_manager) + if not project_ids: + logger.warning("No valid project identifiers found for processing") + return # Retrieval - session_manager.use_source_session() - retrieval_results = threading_util.execute_tasks(_retrieve_one_project, projects) + cwms.api.API_VERSION = 1 + threading_util.execute_tasks(_retrieve_one_project, project_ids) + cwms.api.API_VERSION = 2 - # Storage - session_manager.use_dest_session() - storage_data = threading_util.execute_tasks(_store_one_project, retrieval_results) - results = storage_data +def store_cached_projects(projects): + # Validation + project_ids = location.get_valid_locations(projects) - return ProjectData(results) + if not project_ids: + logger.warning("No valid project identifiers found for processing") + return + location.store_cached_locations(projects) -def _retrieve_one_project(project): - # Split out office id based on dot notation - splits = project.split(".") + # Storage + threading_util.execute_tasks(_store_one_project, project_ids) - if len(splits) != 2: - logger.warning(f"Invalid location format: {project}\nExpected [officeid].[locationid]") - return None - office_id = splits[0] - project_id = splits[1] +def _retrieve_one_project(project): + office_id = project[0] + project_id = project[1] - cache_data = cache_util.get_from_cache(office_id, project_id) + logger.debug(f"API_VERSION before retrieving {project_id}: {cwms.api.API_VERSION}") + logger.debug(f"Retrieving project data for office {office_id} and project {project_id}") + cache_data = cache_util.get_from_cache(office_id, "Projects", project_id) if cache_data: - return cache_data + logger.debug(f"Project data found in cache for office {office_id} and project {project_id}") else: + logger.debug(f"Project data not found in cache for office {office_id} and project {project_id}, retrieving from CWMS") project_data = cwms.get_project(office_id, project_id).json - cache_util.put_in_cache(project_data, office_id, project_id) - return project_data + cache_util.put_in_cache(project_data, office_id, "Projects", project_id) -def _store_one_project(project_data): - cwms.store_project(project_data) - return project_data +def _store_one_project(project): + office_id = project[0] + project_id = project[1] + logger.debug(f"API_VERSION before retrieving {project_id}: {cwms.api.API_VERSION}") + project_data = cache_util.get_from_cache(office_id, "Projects", project_id) + if project_data: + cwms.store_project(project_data) + else: + logger.warning(f"Project data not found in cache for office {office_id} and location {project_id}") diff --git a/cda-etl/src/cda_etl/session_manager.py b/cda-etl/src/cda_etl/session_manager.py index 2d739567f0..2e41bf3062 100644 --- a/cda-etl/src/cda_etl/session_manager.py +++ b/cda-etl/src/cda_etl/session_manager.py @@ -16,6 +16,8 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import cwms +import logging +logger = logging.getLogger(__name__) from config import Config @@ -30,4 +32,5 @@ def use_source_session(self): cwms.init_session(api_root=self.config.source_cda_url, api_key=self.config.source_cda_api_key) def use_dest_session(self): + logger.debug(f"Initializing destination session with URL: {self.config.dest_cda_url} and api_key: {self.config.dest_cda_api_key}") cwms.init_session(api_root=self.config.dest_cda_url, api_key=self.config.dest_cda_api_key) \ No newline at end of file diff --git a/cda-etl/src/cda_etl/timeseries.py b/cda-etl/src/cda_etl/timeseries.py index 7ea79fe14a..653775c6e2 100644 --- a/cda-etl/src/cda_etl/timeseries.py +++ b/cda-etl/src/cda_etl/timeseries.py @@ -22,77 +22,44 @@ import cwms logger = logging.getLogger(__name__) +DATE_TIME_FORMAT = "%Y-%m-%d %H.%M.%S" -class TsRetrievalData: - office_id: str - ts_id: str - def __init__(self, office_id, ts_id, begin, end): - self.office_id = office_id - self.ts_id = ts_id - self.begin = begin - self.end = end +def cache_timeseries(timeseries, begin, end): + locations, ts_info = _validate_and_split_timeseries(timeseries, begin, end) -class TsCacheData: - - def __init__(self, ts_data): - self.ts_data = ts_data - - -def process(config, session_manager): - return process_timeseries(config.timeseries, config.start_time, config.end_time, session_manager) - - -def process_timeseries(timeseries, begin, end, session_manager): + # Make sure we have project locations downloaded + location.cache_locations(locations) - invalid_ts = [] - ts_ids_to_split = {} - for ts in timeseries: - splits = ts.split(".") - if len(splits) != 7: - invalid_ts.append(ts) - else: - ts_ids_to_split[f"{splits[1]}.{splits[2]}.{splits[3]}.{splits[4]}.{splits[5]}.{splits[6]}"] = splits + # Retrieval of Identifier + threading_util.execute_tasks(_retrieve_one_ts_identifier, ts_info) - locations_to_retrieve = [] - ts_info = [] - for id, splits in ts_ids_to_split.items(): - locations_to_retrieve.append(f"{splits[0]}.{splits[1]}") - ts_info.append([splits[0], id, begin, end]) + # Retrieval of Data + threading_util.execute_tasks(_retrieve_one_ts_data, ts_info) - # Make sure we have project locations downloaded - location.process_locations(locations_to_retrieve, session_manager) - # Retrieval of Identifier - session_manager.use_source_session() - retrieval_results = threading_util.execute_tasks(_retrieve_one_ts_identifier, ts_info) +def store_cached_timeseries(timeseries, begin, end): + locations, ts_info = _validate_and_split_timeseries(timeseries, begin, end) + location.store_cached_locations(locations) # Storage of Identifier - session_manager.use_dest_session() - threading_util.execute_tasks(_store_one_ts_id, retrieval_results) - - # Retrieval of Data - session_manager.use_source_session() - retrieval_results = threading_util.execute_tasks(_retrieve_one_ts_data, ts_info) + threading_util.execute_tasks(_store_one_ts_id, ts_info) # Storage of Data - session_manager.use_dest_session() - results = threading_util.execute_tasks(_store_one_ts_id, retrieval_results) - - return TsCacheData(results) + threading_util.execute_tasks(_store_one_ts_data, ts_info) def _retrieve_one_ts_identifier(ts_info): office_id = ts_info[0] ts_id = ts_info[1] - cache_data = cache_util.get_from_cache(office_id, ts_id, "id") + cache_data = cache_util.get_from_cache(office_id, "Timeseries Identifiers", ts_id, "id") if cache_data: - return cache_data + logger.debug(f"Cached Timeseries Identifier for {office_id}.{ts_id}") else: - data = cwms.get_timeseries_identifier(office_id, ts_id).json - cache_util.put_in_cache(data, office_id, ts_id, "id") - return data + logger.debug(f"Fetching Timeseries Identifier for {office_id}.{ts_id}") + data = cwms.get_timeseries_identifier(ts_id, office_id).json + cache_util.put_in_cache(data, office_id, "Timeseries Identifiers", ts_id, "id") def _retrieve_one_ts_data(ts_info): @@ -100,20 +67,59 @@ def _retrieve_one_ts_data(ts_info): ts_id = ts_info[1] begin = ts_info[2] end = ts_info[3] + begin_str = begin.strftime(DATE_TIME_FORMAT) + end_str = end.strftime(DATE_TIME_FORMAT) - cache_data = cache_util.get_from_cache(office_id, ts_id, begin, end, "data") + cache_data = cache_util.get_from_cache(office_id, "Timeseries", ts_id, begin_str, end_str, "data") if cache_data: - return cache_data + logger.debug(f"Cached Timeseries Data for {office_id}.{ts_id} from {begin_str} to {end_str}") else: + logger.debug(f"Fetching Timeseries Data for {office_id}.{ts_id} from {begin_str} to {end_str}") data = cwms.get_timeseries(ts_id, office_id, begin=begin, end=end).json - cache_util.put_in_cache(data, office_id, ts_id, begin, end, "data") - return data + cache_util.put_in_cache(data, office_id, "Timeseries", ts_id, begin_str, end_str, "data") + +def _store_one_ts_id(ts_info): -def _store_one_ts_id(ts_id_data): - cwms.store_timeseries_identifier(ts_id_data) - return ts_id_data + office_id = ts_info[0] + ts_id = ts_info[1] + + cache_data = cache_util.get_from_cache(office_id, "Timeseries Identifiers", ts_id, "id") + cwms.store_timeseries_identifier(cache_data) + +def _store_one_ts_data(ts_info): + office_id = ts_info[0] + ts_id = ts_info[1] + begin = ts_info[2] + end = ts_info[3] + begin_str = begin.strftime(DATE_TIME_FORMAT) + end_str = end.strftime(DATE_TIME_FORMAT) + + cache_data = cache_util.get_from_cache(office_id, "Timeseries", ts_id, begin_str, end_str, "data") + cwms.store_timeseries(cache_data) + + +def _validate_and_split_timeseries(timeseries, begin, end): + # Validation + invalid_ts = [] + ts_ids_to_split = {} + for ts in timeseries: + splits = ts.split(".") + if len(splits) != 7: + logger.warning(f"Invalid time series identifier '{ts}' encountered. Expected format is '[office_id].[location].[parameter].[parameter_type].[interval].[duration].[version]'") + invalid_ts.append(ts) + else: + logger.debug(f"Valid time series identifier '{ts}'") + ts_ids_to_split[f"{splits[1]}.{splits[2]}.{splits[3]}.{splits[4]}.{splits[5]}.{splits[6]}"] = splits + + if not ts_ids_to_split: + logger.warning("No valid time series identifiers found for processing") + return + + locations = [] + ts_info = [] + for id, splits in ts_ids_to_split.items(): + locations.append(f"{splits[0]}.{splits[1]}") + ts_info.append([splits[0], id, begin, end]) -def _store_one_ts_data(ts_data): - cwms.store_timeseries(ts_data) - return ts_data + return locations, ts_info diff --git a/cda-etl/src/cda_etl/utils/cache_util.py b/cda-etl/src/cda_etl/utils/cache_util.py index b60ae9d512..2facc71db8 100644 --- a/cda-etl/src/cda_etl/utils/cache_util.py +++ b/cda-etl/src/cda_etl/utils/cache_util.py @@ -34,7 +34,9 @@ def get_from_cache(*args): return None with open(path, 'r') as f: - return json.load(f) + return f.read() + + def _get_cache_path(*args): diff --git a/cda-etl/src/cda_etl/utils/threading_util.py b/cda-etl/src/cda_etl/utils/threading_util.py index d84f29e26a..4d6aacc3fd 100644 --- a/cda-etl/src/cda_etl/utils/threading_util.py +++ b/cda-etl/src/cda_etl/utils/threading_util.py @@ -38,12 +38,9 @@ def execute_tasks(task_func, items): for item in items } - results = [] for future in as_completed(futures_to_items): item = futures_to_items[future] if future.exception(): logger.warning(f"Exception occurred for {item}: {future.exception()}") elif future.result(): - result = future.result() - results.append([item, result]) - return results + logger.debug(f"No error on execution for {item}") diff --git a/cda-etl/tests/cda_etl/test_location.py b/cda-etl/tests/cda_etl/test_location.py index 28c9548237..3e77242e5d 100644 --- a/cda-etl/tests/cda_etl/test_location.py +++ b/cda-etl/tests/cda_etl/test_location.py @@ -51,7 +51,7 @@ def test_process_locations(mock_session_manager, mocker): mock_execute.side_effect = [retrieval_results, storage_results] locations = ["SWT.TestLoc"] - result = location.process_locations(locations, mock_session_manager) + result = location.cache_locations(locations, mock_session_manager) assert mock_session_manager.use_source_session.called assert mock_session_manager.use_dest_session.called diff --git a/cda-etl/tests/cda_etl/test_project.py b/cda-etl/tests/cda_etl/test_project.py index da7f8bf367..e1e19fa096 100644 --- a/cda-etl/tests/cda_etl/test_project.py +++ b/cda-etl/tests/cda_etl/test_project.py @@ -49,7 +49,7 @@ def test_process_projects(mock_session_manager, mocker): mock_execute.side_effect = [retrieval_results, storage_results] projects = ["SWT.TestProj"] - result = project.process_projects(projects, mock_session_manager) + result = project.cache_projects(projects, mock_session_manager) assert mock_session_manager.use_source_session.called assert mock_session_manager.use_dest_session.called diff --git a/cda-etl/tests/resources/SWT/Locations/EUFA-Dam.json b/cda-etl/tests/resources/SWT/Locations/EUFA-Dam.json new file mode 100644 index 0000000000..ff006e794c --- /dev/null +++ b/cda-etl/tests/resources/SWT/Locations/EUFA-Dam.json @@ -0,0 +1,22 @@ +{ + "office-id": "SWT", + "name": "EUFA-Dam", + "latitude": 35.3070419, + "longitude": -95.3627509, + "active": true, + "public-name": "Dam", + "long-name": "Dam", + "timezone-name": "US/Central", + "location-kind": "EMBANKMENT", + "nation": "US", + "state-initial": "OK", + "county-name": "Haskell", + "nearest-city": "Hoyt, OK", + "horizontal-datum": "NAD83", + "published-longitude": 95.3625, + "published-latitude": 35.306944444444, + "vertical-datum": "NGVD29", + "elevation": 497.99868766404194, + "bounding-office-id": "SWT", + "elevation-units": "ft" +} \ No newline at end of file diff --git a/cda-etl/tests/resources/SWT/Locations/EUFA.json b/cda-etl/tests/resources/SWT/Locations/EUFA.json new file mode 100644 index 0000000000..35c719afc6 --- /dev/null +++ b/cda-etl/tests/resources/SWT/Locations/EUFA.json @@ -0,0 +1,24 @@ +{ + "office-id": "SWT", + "name": "EUFA", + "latitude": 35.3069444, + "longitude": -95.3625, + "active": true, + "public-name": "Eufaula Lake", + "long-name": "Eufaula Lake near Brooken, OK", + "description": "Eufaula Lake near Brooken, OK", + "timezone-name": "US/Central", + "location-type": "RESERVOIR", + "location-kind": "PROJECT", + "nation": "US", + "state-initial": "OK", + "county-name": "Haskell", + "nearest-city": "Hoyt, OK", + "horizontal-datum": "WGS84", + "published-longitude": 95.3625, + "published-latitude": 35.306944444444, + "vertical-datum": "NGVD29", + "elevation": 497.99868766404194, + "bounding-office-id": "SWT", + "elevation-units": "ft" +} \ No newline at end of file