Skip to content

Add CDA-ETL module with initial implementation and basic functionality#1732

Open
RyanM-RMA wants to merge 3 commits into
USACE:developfrom
RyanM-RMA:feature/add_etl_processing
Open

Add CDA-ETL module with initial implementation and basic functionality#1732
RyanM-RMA wants to merge 3 commits into
USACE:developfrom
RyanM-RMA:feature/add_etl_processing

Conversation

@RyanM-RMA
Copy link
Copy Markdown
Collaborator

@RyanM-RMA RyanM-RMA commented May 15, 2026

Includes the addition of the following:

  • Dockerfile and docker-compose for containerizing the service.
  • Gradle build configuration for the module.
  • Core ETL pipeline components: configuration, session management, location, project, and timeseries processing.
  • Environment variable management with etl.env.example.
  • Utility functions and cache handling.
  • Initial set of unit tests for basic validation (e.g., configuration handling).

Summary

Add module for Extract Transform and Load (ETL) between CDA API's.

Related Issue

Closes https://jira.hecdev.net/browse/REGI-481

Validation

Tested by running the gradle and docker-compose processes, verifying data is valid through running CWMSVue and REGI.

Checklist

  • AI tools used

Includes the addition of the following:
- Dockerfile and docker-compose for containerizing the service.
- Gradle build configuration for the module.
- Core ETL pipeline components: configuration, session management, location, project, and timeseries processing.
- Environment variable management with `etl.env.example`.
- Utility functions and cache handling.
- Initial set of unit tests for basic validation (e.g., configuration handling).
@@ -0,0 +1,14 @@
services:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why separate from the root docker-compose.yml? I'd think the defaults should be sourced for CWBI Test and destination the CDA service container

Comment thread cda-etl/build.gradle
final def envFile = 'etl.env'
final def reqFile = 'requirements.txt'

tasks.register('installRequirements', Exec) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at what Stephen did on: https://github.com/DOI-BOR/WTMP-Python-Plotting/blob/main/build.gradle using a gradle plugin to manage python.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which we already do for Node JS. So makes sense to use a plugin for python as well.

# SOFTWARE.
import cwms

class SessionManager:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krowvin
Copy link
Copy Markdown
Collaborator

krowvin commented May 15, 2026

@Enovotny

Might be some ideas in here we can use with cwms-cli

I thought this was a novel idea. Setting locations in the env for reuse

Ie
self.locations = os.getenv("LOCATIONS", "").split(",")

RyanM-RMA added 2 commits May 15, 2026 20:03
- Split core functionality into modular components for improved clarity and maintainability, including separate processing for locations, projects, and timeseries.
- Introduced caching logic in `cache_util.py` for optimized data retrieval and storage.
- Added threading utilities for concurrent task execution in `threading_util.py`.
- Enhanced `SessionManager` logic with dynamic session initialization.
- Updated Gradle build to include a `runEtlUnitTests` task for streamlined testing.
- Improved environment variable examples in `etl.env.example`.
- Introduced comprehensive unit tests for locations, projects, and timeseries modules.
- Various bug fixes and restructured imports for consistency.
…s, projects, and timeseries

- Replaced inline processing calls with modular `cache_*` and `store_cached_*` workflows.
- Added caching and validation logic for projects, locations, and timeseries.
- Consolidated threading and retrieval utilities for improved reliability.
- Updated example environment variables and tests for modular workflows.
- Improved logging for debugging and transparency during execution.
@RyanM-RMA RyanM-RMA marked this pull request as ready for review May 20, 2026 17:18
Comment thread cda-etl/etl.env.example
LOG_LEVEL=INFO

# Data retrieval
LOCATIONS=SWT.EUFA-Dam
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the list of data to load should not be defined by environment variables. Use an external config file (json, yml, etc)

Comment thread cda-etl/etl.env.example
#

# Required settings
SOURCE_CDA_URL=https://cwms-data-test.cwbi.us/cwms-data/
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the toggle to re-extract data from source based on the existence of this env variable? if so, it isn't a required variable

# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import pytest
from unittest.mock import MagicMock, patch
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maintaining mock data for unit tests at this layer is unnecessary maintenance overhead. Integration tests against the real destination data source is much more valuable.

return

# Retrieval
cwms.api.API_VERSION = 1
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's with these api versions being set to 1 and 2?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a cwms-python bug that I was trying to work around. I've created an issue to track it. The code has been removed.

HydrologicEngineeringCenter/cwms-python#289

timeseries.cache_timeseries(config.timeseries, config.start_time, config.end_time)

session_manager.use_dest_session()
# Store cached data, so we're not keeping it all in memory
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment implies that it is in memory now - it should be on the file system at this point right, not in memory?

session_manager.use_source_session()

# Read and cache data
location.cache_locations(config.locations)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we aren't caching data here, we're storing and preserving that data in source control

for ts in timeseries:
splits = ts.split(".")
if len(splits) != 7:
logger.warning(f"Invalid time series identifier '{ts}' encountered. Expected format is '[office_id].[location].[parameter].[parameter_type].[interval].[duration].[version]'")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is office id on the time series id? we really don't need any validation here - CDA itself will validate the data requests

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be resolved with the change to YAML based configuration.

threading_util.execute_tasks(_store_one_ts_data, ts_info)


def _retrieve_one_ts_identifier(ts_info):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by this method, if cache_data exists, we just drop it on the floor?

ts_id = ts_info[1]

cache_data = cache_util.get_from_cache(office_id, "Timeseries Identifiers", ts_id, "id")
cwms.store_timeseries_identifier(cache_data)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the store ts call will create the identifier, no need for a separate method call

Comment thread cda-etl/src/cda_etl/timeseries.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants