Skip to content

Commit 42dfb59

Browse files
committed
getting dtype from layer_definition; adding simplified geometry to output
1 parent d205604 commit 42dfb59

1 file changed

Lines changed: 46 additions & 10 deletions

File tree

pipelines/rme_to_athena/rme_to_athena_parquet.py

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
Enhances Philip's June 2025 rme_to_athena.py
99
"""
1010
import argparse
11+
import json
1112
import logging
1213
import os
1314
from pathlib import Path
@@ -120,7 +121,7 @@ def get_matching_file(parent_dir: str, regex_str: str) -> str | None:
120121
def download_rme_geopackage(
121122
rs_api: RiverscapesAPI,
122123
project: RiverscapesProject,
123-
huc_dir: str
124+
huc_dir: str | Path
124125
) -> str:
125126
"""
126127
Download the RME GeoPackage for a project and return its file path.
@@ -132,6 +133,23 @@ def download_rme_geopackage(
132133
return rme_gpkg
133134

134135

136+
def get_layer_columns_dict(layer_definitions_path: Path, layer_id: str) -> dict[str, dict]:
137+
"""
138+
Load the layer_definitions.json and return a dictionary of columns and their properties for the given layer_id.
139+
Args:
140+
layer_definitions_path (Path): the Path to the json file to use
141+
layer_id (str): The layer_id to look up.
142+
Returns:
143+
dict[str, dict]: Dictionary mapping column names to their property dicts.
144+
"""
145+
with layer_definitions_path.open('r', encoding='utf-8') as f:
146+
data = json.load(f)
147+
for layer in data.get('layers', []):
148+
if layer.get('layer_id') == layer_id:
149+
return {col['name']: col for col in layer.get('columns', [])}
150+
raise ValueError(f"Layer ID '{layer_id}' not found in {layer_definitions_path}")
151+
152+
135153
def extract_metrics_to_geodataframe(gpkg_path: str, spatialite_path: str) -> gpd.GeoDataFrame:
136154
"""
137155
Connect to the GeoPackage, run the SQL, and return a GeoDataFrame.
@@ -169,18 +187,36 @@ def extract_metrics_to_geodataframe(gpkg_path: str, spatialite_path: str) -> gpd
169187
warnings.filterwarnings("ignore", message="pandas only supports SQLAlchemy connectable")
170188
df = pd.read_sql_query(sql, conn)
171189

190+
# Use the data dictionary to set column types.
172191
# because there are nulls, the combination of sqlites dynamic typing and pandas' type inference mis-assigns data types
173192
# actually the problem is that it is sometimes a double, sometimes INT64. Needs to be consistent
174-
# TODO: this should look up the types from our data dictionary but i am hardcoding for now
175-
for field in ['fcode', 'seg_distance', 'stream_order', 'headwater', 'confluences', 'diffluences', 'tributaries']:
176-
df[field] = df[field].astype('Int64') # Note the capital 'I' for pandas nullable integer
193+
try:
194+
# NOTE - using this one definitions file to describe both INPUT AND OUTPUT structure
195+
# ideally we'd take the data types from the RME data dictionary and write them (along with any changes we're making) to the raw_rme version. But that doesn't exist yet
196+
# Possible enhancement: check the is_required property and if TRUE then we could use a nullable integer
197+
columns_dict = get_layer_columns_dict(Path(__file__).parent / 'layer_definitions.json', 'raw_rme')
198+
for field, props in columns_dict.items():
199+
if props.get('dtype') == 'INTEGER' and field in df.columns:
200+
df[field] = df[field].astype('Int64') # pandas nullable integer
201+
except Exception as e:
202+
raise Exception(f"Could not apply data dictionary types: {e}") from e
177203

178204
# Remove all columns named 'dgoid' (case-insensitive, even if duplicated)
179205
df = df.loc[:, [col for col in df.columns if col.lower() != 'dgoid']]
180206
# convert wkb geometry to shapely objects
181207
df['dgo_geom'] = df['dgo_geom'].apply(wkb.loads) # pyright: ignore[reportCallIssue, reportArgumentType]
182208
gdf = gpd.GeoDataFrame(df, geometry='dgo_geom', crs='EPSG:4326')
183209

210+
# Reproject to EPSG:5070 for simplification
211+
gdf_proj = gdf.to_crs(epsg=5070)
212+
213+
# Use simplify_coverage for topology-preserving simplification
214+
gdf["geometry_simplified"] = gdf_proj.geometry.simplify_coverage(tolerance=tolerance)
215+
# Reproject simplified geometry back to EPSG:4326
216+
gdf["geometry_simplified"] = gpd.GeoSeries(gdf["geometry_simplified"], crs=5070).to_crs(epsg=4326)
217+
gdf = gdf.set_crs(epsg=4326)
218+
gdf = gdf.reset_index(drop=True)
219+
184220
bbox_df = gdf.geometry.bounds.rename(columns={'minx': 'xmin', 'miny': 'ymin', 'maxx': 'xmax', 'maxy': 'ymax'})
185221
# Combine into a struct-like dict for each row
186222
gdf['dgo_geom_bbox'] = bbox_df.apply(
@@ -191,10 +227,10 @@ def extract_metrics_to_geodataframe(gpkg_path: str, spatialite_path: str) -> gpd
191227
return gdf
192228

193229

194-
def delete_folder(dirpath: str) -> None:
230+
def delete_folder(dirpath: Path) -> None:
195231
"""delete a local folder and its contents"""
196232
log = Logger('delete downloads')
197-
if os.path.isdir(dirpath):
233+
if dirpath.is_dir():
198234
try:
199235
log.info(f'Deleting directory {dirpath}')
200236
shutil.rmtree(dirpath)
@@ -241,7 +277,7 @@ def scrape_rme(
241277
# - Optionally clean up
242278

243279
log = Logger('Scrape RME')
244-
280+
download_dir = Path(download_dir)
245281
# NEW WAY
246282
# run Athena query to find all eligible projects that are newer than what is already scraped
247283
projects_to_add_df = query_to_dataframe(missing_projects_query, 'identify new projects')
@@ -267,7 +303,7 @@ def scrape_rme(
267303
model_version_int = semver_to_int(project.model_version)
268304

269305
try:
270-
huc_dir = os.path.join(download_dir, project.huc)
306+
huc_dir = download_dir / project.huc
271307
safe_makedirs(huc_dir)
272308
gpkg_path = download_rme_geopackage(rs_api, project, huc_dir)
273309
data_gdf = extract_metrics_to_geodataframe(gpkg_path, spatialite_path)
@@ -281,10 +317,10 @@ def scrape_rme(
281317
# until we have a more robust schema check this is something
282318
if len(data_gdf.columns) != 134:
283319
log.warning(f"Expected 134 columns, got {len(data_gdf.columns)}")
284-
rme_pq_filepath = os.path.join(huc_dir, f'rme_{project.huc}.parquet')
320+
rme_pq_filepath = huc_dir / f'rme_{project.huc}.parquet'
285321
data_gdf.to_parquet(rme_pq_filepath)
286322
# don't use os.path.join because this is aws os, not system os
287-
s3_key = f'data_exchange/riverscape_metrics/{os.path.basename(rme_pq_filepath)}'
323+
s3_key = f'data_exchange/riverscape_metrics/{rme_pq_filepath.name}'
288324
upload_to_s3(rme_pq_filepath, data_bucket, s3_key)
289325

290326
if delete_downloads_when_done:

0 commit comments

Comments
 (0)