diff --git a/scripts/item_create.py b/scripts/item_create.py index 9b8deeb..828577b 100644 --- a/scripts/item_create.py +++ b/scripts/item_create.py @@ -28,7 +28,8 @@ from tqdm import tqdm from stac_utils import ( - check_geotiff_cog, + geotiff_extract_metadata, + item_create_from_cache, date_extract_from_path, datetime_parse_item, encode_url_for_gdal, @@ -52,6 +53,9 @@ def process_item(path_item: str, collection_id: str, path_local: str, results_lookup: dict) -> dict | None: """Process a single GeoTIFF URL to create a STAC item. + Uses cached metadata when available (no remote read). Falls back to + rio_stac for cache misses (should not happen if validation ran first). + Returns dict with item_id and item object, or None if processing fails. """ href_item = fix_url(path_item) @@ -70,7 +74,6 @@ def process_item(path_item: str, collection_id: str, path_local: str, if date_str: item_time = datetime_parse_item(date_str) else: - # Placeholder for items where date cannot be extracted (e.g. albers10k2m) item_time = datetime(2000, 1, 1, tzinfo=timezone.utc) datetime_is_unknown = True @@ -81,21 +84,35 @@ def process_item(path_item: str, collection_id: str, path_local: str, ) try: - # Encode for GDAL/vsicurl (spaces → %20), but keep original for asset href - gdal_path = encode_url_for_gdal(path_item) - item = rio_stac.stac.create_stac_item( - gdal_path, - id=item_id, - asset_media_type=media_type, - asset_name='image', - asset_href=href_item, - with_proj=True, - collection=collection_id, - collection_url=PATH_S3_JSON, - asset_roles=["data"] - ) + # Cache hit: build from metadata (no remote read) + if check.get("epsg") is not None: + item = item_create_from_cache( + url=path_item, + item_id=item_id, + metadata=check, + collection_id=collection_id, + collection_url=PATH_S3_JSON, + media_type=media_type, + item_datetime=item_time, + ) + else: + # Cache miss: fall back to rio_stac (remote read) + logger.info("Cache miss for %s, reading remote file", href_item) + gdal_path = encode_url_for_gdal(path_item) + item = rio_stac.stac.create_stac_item( + gdal_path, + id=item_id, + asset_media_type=media_type, + asset_name='image', + asset_href=href_item, + with_proj=True, + collection=collection_id, + collection_url=PATH_S3_JSON, + asset_roles=["data"] + ) + item.assets['image'].href = href_item + item.datetime = item_time - item.assets['image'].href = href_item if datetime_is_unknown: item.properties["datetime_unknown"] = True @@ -114,30 +131,55 @@ def process_item(path_item: str, collection_id: str, path_local: str, # ============================================================================= def load_validation_cache(urls_to_check: list[str]) -> dict: - """Load cached validation results and validate new URLs as needed. + """Load cached metadata and extract metadata for new URLs as needed. - Returns lookup dict: {url: {"is_geotiff": bool, "is_cog": bool}} + Returns lookup dict: {url: {is_geotiff, is_cog, epsg, height, width, transform, bounds}} + + Old cache rows (missing spatial columns) trigger re-extraction on cache miss + in process_item via the rio_stac fallback path. """ + all_columns = ["url", "is_geotiff", "is_cog", "epsg", "height", "width", "transform", "bounds"] + if os.path.exists(PATH_RESULTS_CSV): df_existing = pd.read_csv(PATH_RESULTS_CSV) existing_urls = set(df_existing["url"]) logger.info("Loaded %d existing validation results", len(df_existing)) else: - df_existing = pd.DataFrame(columns=["url", "is_geotiff", "is_cog"]) + df_existing = pd.DataFrame(columns=all_columns) existing_urls = set() logger.info("No existing validation cache found, will validate all URLs") - urls_to_validate = [url for url in urls_to_check if url not in existing_urls] - logger.info("%d URLs need validation (%d already cached)", + # Detect old-format rows missing spatial metadata + has_spatial = set() + needs_upgrade = set() + if "epsg" in df_existing.columns: + for _, row in df_existing.iterrows(): + if pd.notna(row.get("epsg")): + has_spatial.add(row["url"]) + elif row.get("is_geotiff"): + needs_upgrade.add(row["url"]) + else: + needs_upgrade = {row["url"] for _, row in df_existing.iterrows() if row["is_geotiff"]} + + urls_to_validate = [url for url in urls_to_check + if url not in existing_urls or url in needs_upgrade] + if needs_upgrade: + # Drop old rows that will be re-extracted with spatial metadata + urls_upgrading = needs_upgrade & set(urls_to_validate) + if urls_upgrading: + df_existing = df_existing[~df_existing["url"].isin(urls_upgrading)] + logger.info("%d cached URLs need spatial metadata upgrade", len(urls_upgrading)) + + logger.info("%d URLs need metadata extraction (%d already cached with full metadata)", len(urls_to_validate), len(urls_to_check) - len(urls_to_validate)) if urls_to_validate: - logger.info("Validating %d GeoTIFFs...", len(urls_to_validate)) + logger.info("Extracting metadata from %d GeoTIFFs...", len(urls_to_validate)) with concurrent.futures.ThreadPoolExecutor() as executor: new_results = list(tqdm( - executor.map(check_geotiff_cog, urls_to_validate), + executor.map(geotiff_extract_metadata, urls_to_validate), total=len(urls_to_validate), - desc="Validating GeoTIFFs" + desc="Extracting GeoTIFF metadata" )) df_new = pd.DataFrame(new_results) @@ -146,12 +188,19 @@ def load_validation_cache(urls_to_check: list[str]) -> dict: logger.info("Saved %d validation results to %s", len(df_all), PATH_RESULTS_CSV) else: df_all = df_existing - logger.info("No new URLs to validate, using existing cache") - - return { - fix_url(row["url"]): {"is_geotiff": row["is_geotiff"], "is_cog": row["is_cog"]} - for _, row in df_all.iterrows() - } + logger.info("All URLs cached, no remote reads needed") + + # Build lookup with full metadata (NaN → None for missing spatial columns) + result = {} + for _, row in df_all.iterrows(): + entry = {"is_geotiff": row["is_geotiff"], "is_cog": row["is_cog"]} + for col in ["epsg", "height", "width", "transform", "bounds"]: + if col in row and pd.notna(row[col]): + entry[col] = int(row[col]) if col in ("epsg", "height", "width") else row[col] + else: + entry[col] = None + result[fix_url(row["url"])] = entry + return result # ============================================================================= diff --git a/scripts/item_reprocess.py b/scripts/item_reprocess.py index c99d06b..26592c9 100644 --- a/scripts/item_reprocess.py +++ b/scripts/item_reprocess.py @@ -21,6 +21,7 @@ from datetime import datetime, timezone from stac_utils import ( + item_create_from_cache, date_extract_from_path, datetime_parse_item, encode_url_for_gdal, @@ -74,22 +75,35 @@ def process_item(path_item: str, collection, results_lookup) -> dict | None: ) try: - gdal_path = encode_url_for_gdal(path_item) - item = rio_stac.stac.create_stac_item( - gdal_path, - id=item_id, - asset_media_type=media_type, - asset_name='image', - asset_href=href_item, - with_proj=True, - collection=collection.id, - collection_url=PATH_S3_JSON, - asset_roles=["data"] - ) + # Cache hit: build from metadata (no remote read) + if check.get("epsg") is not None: + item = item_create_from_cache( + url=path_item, + item_id=item_id, + metadata=check, + collection_id=collection.id, + collection_url=PATH_S3_JSON, + media_type=media_type, + item_datetime=item_time, + ) + else: + # Cache miss: fall back to rio_stac (remote read) + gdal_path = encode_url_for_gdal(path_item) + item = rio_stac.stac.create_stac_item( + gdal_path, + id=item_id, + asset_media_type=media_type, + asset_name='image', + asset_href=href_item, + with_proj=True, + collection=collection.id, + collection_url=PATH_S3_JSON, + asset_roles=["data"] + ) + item.assets['image'].href = href_item + item.datetime = item_time - item.assets['image'].href = href_item - # Flag items with unknown datetime for future improvement if datetime_is_unknown: item.properties["datetime_unknown"] = True @@ -125,10 +139,15 @@ def main(): return 1 df_all = pd.read_csv(PATH_RESULTS_CSV) - results_lookup = { - fix_url(row["url"]): {"is_geotiff": row["is_geotiff"], "is_cog": row["is_cog"]} - for _, row in df_all.iterrows() - } + results_lookup = {} + for _, row in df_all.iterrows(): + entry = {"is_geotiff": row["is_geotiff"], "is_cog": row["is_cog"]} + for col in ["epsg", "height", "width", "transform", "bounds"]: + if col in row and pd.notna(row[col]): + entry[col] = int(row[col]) if col in ("epsg", "height", "width") else row[col] + else: + entry[col] = None + results_lookup[fix_url(row["url"])] = entry print(f"✓ Loaded {len(results_lookup)} validation results") print() diff --git a/scripts/stac_utils.py b/scripts/stac_utils.py index 744c57c..46a141b 100644 --- a/scripts/stac_utils.py +++ b/scripts/stac_utils.py @@ -8,11 +8,16 @@ - collection_create.py """ +import json import re -import subprocess from datetime import datetime, timezone +import pystac +import rasterio +import rasterio.warp import requests +from rio_cogeo.cogeo import cog_validate +from shapely.geometry import box, mapping # ============================================================================= @@ -87,27 +92,127 @@ def datetime_parse_item(s: str | None) -> datetime | None: # GeoTIFF Validation # ============================================================================= -def check_geotiff_cog(url: str) -> dict: - """Validate GeoTIFF and COG status using rio cogeo validate. +def geotiff_extract_metadata(url: str) -> dict: + """Extract spatial metadata and validate GeoTIFF/COG status in one remote read. - Returns dict with url, is_geotiff (readable), and is_cog (cloud-optimized). + Opens the remote GeoTIFF via /vsicurl/, extracts CRS, bounds, shape, and + transform, then validates COG status. All metadata needed for STAC item + creation is returned so subsequent builds can skip the remote read. + + Returns dict with url, is_geotiff, is_cog, epsg, height, width, transform, bounds. """ + gdal_url = encode_url_for_gdal(fix_url(url)) + vsicurl_path = f"/vsicurl/{gdal_url}" + try: - gdal_url = encode_url_for_gdal(url) - result = subprocess.run( - ["rio", "cogeo", "validate", f"/vsicurl/{gdal_url}"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - check=False - ) - output = result.stdout.decode() + result.stderr.decode() + with rasterio.open(vsicurl_path) as src: + epsg = src.crs.to_epsg() + height = src.height + width = src.width + transform = list(src.transform)[:6] + bounds = list(src.bounds) + + is_valid, _, _ = cog_validate(vsicurl_path, quiet=True) + + return { + "url": url, + "is_geotiff": True, + "is_cog": is_valid, + "epsg": epsg, + "height": height, + "width": width, + "transform": json.dumps(transform), + "bounds": json.dumps(bounds), + } + except Exception: return { "url": url, - "is_geotiff": "is NOT a valid cloud optimized GeoTIFF" in output or "is a valid cloud optimized GeoTIFF" in output, - "is_cog": "is a valid cloud optimized GeoTIFF" in output + "is_geotiff": False, + "is_cog": False, + "epsg": None, + "height": None, + "width": None, + "transform": None, + "bounds": None, } - except FileNotFoundError: - raise RuntimeError("`rio cogeo` is not installed or not in PATH. Install with: pip install rio-cogeo") + + +# ============================================================================= +# STAC Item Creation from Cache +# ============================================================================= + +def item_create_from_cache( + url: str, + item_id: str, + metadata: dict, + collection_id: str, + collection_url: str, + media_type: str, + item_datetime: datetime, +) -> pystac.Item: + """Build a pystac.Item from cached metadata without any remote file access. + + Uses cached CRS, bounds, shape, and transform to construct the item geometry + and proj extension fields. WGS84 bbox/geometry computed locally via + rasterio.warp.transform_bounds. + """ + epsg = metadata["epsg"] + height = metadata["height"] + width = metadata["width"] + transform = json.loads(metadata["transform"]) if isinstance(metadata["transform"], str) else metadata["transform"] + bounds = json.loads(metadata["bounds"]) if isinstance(metadata["bounds"], str) else metadata["bounds"] + + # Native CRS bounds → WGS84 bbox + left, bottom, right, top = bounds + w, s, e, n = rasterio.warp.transform_bounds( + f"EPSG:{epsg}", "EPSG:4326", left, bottom, right, top + ) + bbox = [w, s, e, n] + geometry = mapping(box(w, s, e, n)) + + # Native CRS geometry (rectangle from bounds) + proj_geometry = mapping(box(left, bottom, right, top)) + proj_bbox = [left, bottom, right, top] + + # 9-element affine (rasterio stores 6, STAC proj uses 9) + proj_transform = transform + [0.0, 0.0, 1.0] + + properties = { + "proj:epsg": epsg, + "proj:geometry": proj_geometry, + "proj:bbox": proj_bbox, + "proj:shape": [height, width], + "proj:transform": proj_transform, + } + + item = pystac.Item( + id=item_id, + geometry=geometry, + bbox=bbox, + datetime=item_datetime, + properties=properties, + stac_extensions=[ + "https://stac-extensions.github.io/projection/v1.1.0/schema.json" + ], + ) + + item.add_link(pystac.Link( + rel="collection", + target=collection_url, + media_type="application/json", + )) + item.collection_id = collection_id + + item.add_asset( + "image", + pystac.Asset( + href=fix_url(url), + media_type=media_type, + roles=["data"], + ), + ) + + return item # =============================================================================