Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ gradle.properties
kdata
pki/certs
compose.env
etl.env
.vscode
**/node_modules/
cwms-data-api/features.properties
cwms-data-api/features.properties
cda-etl/logs
cda-etl/cache
10 changes: 10 additions & 0 deletions cda-etl/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM python:3.14-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/ /app/src/

ENTRYPOINT ["python", "src/cda_etl/main.py"]
77 changes: 77 additions & 0 deletions cda-etl/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* MIT License
* Copyright (c) 2026 Hydrologic Engineering Center
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

plugins {
id 'base'
}

def isWindows = {
System.getProperty('os.name').toLowerCase().contains('win')
}

final def pythonCmd = isWindows() ? 'python' : 'python3'
final def mainScript = 'src/cda_etl/main.py'
final def envFile = 'etl.env'
final def reqFile = 'requirements.txt'

tasks.register('installRequirements', Exec) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at what Stephen did on: https://github.com/DOI-BOR/WTMP-Python-Plotting/blob/main/build.gradle using a gradle plugin to manage python.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which we already do for Node JS. So makes sense to use a plugin for python as well.

commandLine pythonCmd, '-m', 'pip', 'install', '-r', reqFile
workingDir projectDir
inputs.file(new File(projectDir, reqFile))
outputs.file(new File(buildDir, 'pip-install.marker'))
doLast {
mkdir buildDir
new File(buildDir, 'pip-install.marker').text = "installed at ${new Date()}\n"
}
}

tasks.register('runEtl', Exec) {
dependsOn 'installRequirements'
group 'application'
executable pythonCmd
args mainScript
workingDir projectDir

// Load environment variables from etl.env
doFirst {
def envFileObj = new File(projectDir, envFile)
if (envFileObj.exists()) {
envFileObj.eachLine { line ->
if (line.trim() && !line.startsWith('#')) {
def parts = line.split('=', 2)
if (parts.length == 2) {
environment parts[0].trim(), parts[1].trim()
}
}
}
} else {
logger.warn("Environment file ${envFile} not found.")
}
}
}

tasks.register('runEtlUnitTests', Exec) {
dependsOn 'installRequirements'
group 'verification'
executable pythonCmd
args '-m', 'pytest'
workingDir projectDir
environment 'PYTHONPATH', 'src/cda_etl'
}
14 changes: 14 additions & 0 deletions cda-etl/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
services:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why separate from the root docker-compose.yml? I'd think the defaults should be sourced for CWBI Test and destination the CDA service container

cda-etl:
build: .
volumes:
- ./cache:/app/cache
- ./logs:/app/logs
environment:
-SOURCE_CDA_URL:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are all empty....

-SOURCE_CDA_API_KEY:
-DEST_CDA_URL:
-DEST_CDA_API_KEY:
-START_TIME:
-END_TIME:
-LOOKBACK:
36 changes: 36 additions & 0 deletions cda-etl/etl.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# MIT License
# Copyright (c) 2025 Hydrologic Engineering Center
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

# Required settings
SOURCE_CDA_URL=https://cwms-data-test.cwbi.us/cwms-data/
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the toggle to re-extract data from source based on the existence of this env variable? if so, it isn't a required variable

SOURCE_CDA_API_KEY=
DEST_CDA_URL=http://localhost:7000/cwms-data
DEST_CDA_API_KEY=
START_TIME=2025-01-01
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

START_TIME....leaves off the "time" component..... ;-)

END_TIME=2026-01-01
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a LOOKBACK variable being used in your empty docker-compose.yml variables that is missing here.


# Optional settings
MAX_THREADS=10
LOG_LEVEL=INFO

# Data retrieval
LOCATIONS=SWT.EUFA-Dam
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the list of data to load should not be defined by environment variables. Use an external config file (json, yml, etc)

PROJECTS=SWT.EUFA
TIMESERIES=SWT.EUFA.Elev.Inst.1Hour.0.Ccp-Rev
4 changes: 4 additions & 0 deletions cda-etl/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
cwms-python
pytest
pytest-mock
pytest-cov
17 changes: 17 additions & 0 deletions cda-etl/src/cda_etl/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# MIT License
# Copyright (c) 2026 Hydrologic Engineering Center
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
79 changes: 79 additions & 0 deletions cda-etl/src/cda_etl/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# MIT License
# Copyright (c) 2026 Hydrologic Engineering Center
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import os
import logging
from datetime import datetime

logger = logging.getLogger(__name__)
DATE_TIME_FORMAT = "%Y-%m-%d"


class Config:
source_cda_url: str
source_cda_api_key: str
dest_cda_url: str
dest_cda_api_key: str
start_time: datetime
end_time: datetime
max_threads: int
locations: list[str]
projects: list[str]
timeseries: list[str]

def __init__(self):
self.source_cda_url = os.getenv("SOURCE_CDA_URL")
self.source_cda_api_key = os.getenv("SOURCE_CDA_API_KEY")
self.dest_cda_url = os.getenv("DEST_CDA_URL")
self.dest_cda_api_key = os.getenv("DEST_CDA_API_KEY")
start_time = os.getenv("START_TIME")
end_time = os.getenv("END_TIME")
max_threads = os.getenv("MAX_THREADS", "1")
self.locations = os.getenv("LOCATIONS", "").split(",")
self.projects = os.getenv("PROJECTS", "").split(",")
self.timeseries = os.getenv("TIMESERIES", "").split(",")

if not self.source_cda_url or not self.dest_cda_url:
raise ValueError("Missing required environment variables for CDA ETL configuration")

if not self.source_cda_api_key:
logger.warning("Missing SOURCE_CDA_API_KEY environment variable.")

if not self.dest_cda_api_key:
logger.warning("Missing DEST_CDA_API_KEY environment variable.")


if not start_time and not end_time:
raise ValueError("Must set both START_TIME and END_TIME in the format of %Y-%m-%d. Example:\n"
"START_TIME=2023-01-01\n"
"END_TIME=2023-01-31")

try:
self.start_time = datetime.strptime(start_time, DATE_TIME_FORMAT)
except:
raise ValueError("START_TIME must be in the format of %Y-%m-%d")
try:
self.end_time = datetime.strptime(end_time, DATE_TIME_FORMAT)
except:
raise ValueError("END_TIME must be in the format of %Y-%m-%d")

try:
self.max_threads = int(max_threads)
except ValueError:
raise ValueError("MAX_THREADS must be a number")

logger.info(f"Configuration read: {str(self)}")
88 changes: 88 additions & 0 deletions cda-etl/src/cda_etl/location.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# MIT License
# Copyright (c) 2026 Hydrologic Engineering Center
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from concurrent.futures import ThreadPoolExecutor
import logging
import cwms
import utils.cache_util as cache_util
import utils.threading_util as threading_util

logger = logging.getLogger(__name__)


def get_valid_locations(locations):
location_ids = []
index = 0
for location in locations:
splits = location.split(".")
if len(splits) != 2:
logger.warning(f"Invalid location at {index}: {location}\nExpected [officeid].[locationid]")
else:
logger.debug(f"Valid location found at {index}: {location}, splits: {splits}")
location_ids.append(splits)
index += 1

if not location_ids:
logger.warning("No valid locations provided for processing")
return location_ids


def cache_locations(locations):
# Validation
location_ids = get_valid_locations(locations)

if not location_ids:
logger.warning("No valid locations found for retrieving")
return

# Retrieval
threading_util.execute_tasks(_retrieve_one_location, location_ids)


def store_cached_locations(locations):
location_ids = get_valid_locations(locations)

if not location_ids:
logger.warning("No valid locations found for retrieving")
return

# Storage
threading_util.execute_tasks(_store_one_location, location_ids)


def _retrieve_one_location(location):
office_id = location[0]
location_id = location[1]

logger.debug(f"Retrieving location data for office {office_id} and location {location_id}")
cache_data = cache_util.get_from_cache(office_id, "Locations", location_id)
if cache_data:
logger.debug(f"Location data found in cache for office {office_id} and location {location_id}")
else:
logger.debug(f"Location data not found in cache for office {office_id} and location {location_id}")
location_data = cwms.get_location(location_id, office_id).json
cache_util.put_in_cache(location_data, office_id, "Locations", location_id)


def _store_one_location(location):
office_id = location[0]
location_id = location[1]
location_data = cache_util.get_from_cache(office_id, "Locations", location_id)
if location_data:
cwms.store_location(location_data)
else:
logger.warning(f"Location data not found in cache for office {office_id} and location {location_id}")
Loading
Loading