Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 79 additions & 30 deletions scripts/item_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from tqdm import tqdm

from stac_utils import (
check_geotiff_cog,
geotiff_extract_metadata,
item_create_from_cache,
date_extract_from_path,
datetime_parse_item,
encode_url_for_gdal,
Expand All @@ -52,6 +53,9 @@ def process_item(path_item: str, collection_id: str, path_local: str,
results_lookup: dict) -> dict | None:
"""Process a single GeoTIFF URL to create a STAC item.

Uses cached metadata when available (no remote read). Falls back to
rio_stac for cache misses (should not happen if validation ran first).

Returns dict with item_id and item object, or None if processing fails.
"""
href_item = fix_url(path_item)
Expand All @@ -70,7 +74,6 @@ def process_item(path_item: str, collection_id: str, path_local: str,
if date_str:
item_time = datetime_parse_item(date_str)
else:
# Placeholder for items where date cannot be extracted (e.g. albers10k2m)
item_time = datetime(2000, 1, 1, tzinfo=timezone.utc)
datetime_is_unknown = True

Expand All @@ -81,21 +84,35 @@ def process_item(path_item: str, collection_id: str, path_local: str,
)

try:
# Encode for GDAL/vsicurl (spaces → %20), but keep original for asset href
gdal_path = encode_url_for_gdal(path_item)
item = rio_stac.stac.create_stac_item(
gdal_path,
id=item_id,
asset_media_type=media_type,
asset_name='image',
asset_href=href_item,
with_proj=True,
collection=collection_id,
collection_url=PATH_S3_JSON,
asset_roles=["data"]
)
# Cache hit: build from metadata (no remote read)
if check.get("epsg") is not None:
item = item_create_from_cache(
url=path_item,
item_id=item_id,
metadata=check,
collection_id=collection_id,
collection_url=PATH_S3_JSON,
media_type=media_type,
item_datetime=item_time,
)
else:
# Cache miss: fall back to rio_stac (remote read)
logger.info("Cache miss for %s, reading remote file", href_item)
gdal_path = encode_url_for_gdal(path_item)
item = rio_stac.stac.create_stac_item(
gdal_path,
id=item_id,
asset_media_type=media_type,
asset_name='image',
asset_href=href_item,
with_proj=True,
collection=collection_id,
collection_url=PATH_S3_JSON,
asset_roles=["data"]
)
item.assets['image'].href = href_item

item.datetime = item_time
item.assets['image'].href = href_item

if datetime_is_unknown:
item.properties["datetime_unknown"] = True
Expand All @@ -114,30 +131,55 @@ def process_item(path_item: str, collection_id: str, path_local: str,
# =============================================================================

def load_validation_cache(urls_to_check: list[str]) -> dict:
"""Load cached validation results and validate new URLs as needed.
"""Load cached metadata and extract metadata for new URLs as needed.

Returns lookup dict: {url: {"is_geotiff": bool, "is_cog": bool}}
Returns lookup dict: {url: {is_geotiff, is_cog, epsg, height, width, transform, bounds}}

Old cache rows (missing spatial columns) trigger re-extraction on cache miss
in process_item via the rio_stac fallback path.
"""
all_columns = ["url", "is_geotiff", "is_cog", "epsg", "height", "width", "transform", "bounds"]

if os.path.exists(PATH_RESULTS_CSV):
df_existing = pd.read_csv(PATH_RESULTS_CSV)
existing_urls = set(df_existing["url"])
logger.info("Loaded %d existing validation results", len(df_existing))
else:
df_existing = pd.DataFrame(columns=["url", "is_geotiff", "is_cog"])
df_existing = pd.DataFrame(columns=all_columns)
existing_urls = set()
logger.info("No existing validation cache found, will validate all URLs")

urls_to_validate = [url for url in urls_to_check if url not in existing_urls]
logger.info("%d URLs need validation (%d already cached)",
# Detect old-format rows missing spatial metadata
has_spatial = set()
needs_upgrade = set()
if "epsg" in df_existing.columns:
for _, row in df_existing.iterrows():
if pd.notna(row.get("epsg")):
has_spatial.add(row["url"])
elif row.get("is_geotiff"):
needs_upgrade.add(row["url"])
else:
needs_upgrade = {row["url"] for _, row in df_existing.iterrows() if row["is_geotiff"]}

urls_to_validate = [url for url in urls_to_check
if url not in existing_urls or url in needs_upgrade]
if needs_upgrade:
# Drop old rows that will be re-extracted with spatial metadata
urls_upgrading = needs_upgrade & set(urls_to_validate)
if urls_upgrading:
df_existing = df_existing[~df_existing["url"].isin(urls_upgrading)]
logger.info("%d cached URLs need spatial metadata upgrade", len(urls_upgrading))

logger.info("%d URLs need metadata extraction (%d already cached with full metadata)",
len(urls_to_validate), len(urls_to_check) - len(urls_to_validate))

if urls_to_validate:
logger.info("Validating %d GeoTIFFs...", len(urls_to_validate))
logger.info("Extracting metadata from %d GeoTIFFs...", len(urls_to_validate))
with concurrent.futures.ThreadPoolExecutor() as executor:
new_results = list(tqdm(
executor.map(check_geotiff_cog, urls_to_validate),
executor.map(geotiff_extract_metadata, urls_to_validate),
total=len(urls_to_validate),
desc="Validating GeoTIFFs"
desc="Extracting GeoTIFF metadata"
))

df_new = pd.DataFrame(new_results)
Expand All @@ -146,12 +188,19 @@ def load_validation_cache(urls_to_check: list[str]) -> dict:
logger.info("Saved %d validation results to %s", len(df_all), PATH_RESULTS_CSV)
else:
df_all = df_existing
logger.info("No new URLs to validate, using existing cache")

return {
fix_url(row["url"]): {"is_geotiff": row["is_geotiff"], "is_cog": row["is_cog"]}
for _, row in df_all.iterrows()
}
logger.info("All URLs cached, no remote reads needed")

# Build lookup with full metadata (NaN → None for missing spatial columns)
result = {}
for _, row in df_all.iterrows():
entry = {"is_geotiff": row["is_geotiff"], "is_cog": row["is_cog"]}
for col in ["epsg", "height", "width", "transform", "bounds"]:
if col in row and pd.notna(row[col]):
entry[col] = int(row[col]) if col in ("epsg", "height", "width") else row[col]
else:
entry[col] = None
result[fix_url(row["url"])] = entry
return result


# =============================================================================
Expand Down
55 changes: 37 additions & 18 deletions scripts/item_reprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from datetime import datetime, timezone

from stac_utils import (
item_create_from_cache,
date_extract_from_path,
datetime_parse_item,
encode_url_for_gdal,
Expand Down Expand Up @@ -74,22 +75,35 @@ def process_item(path_item: str, collection, results_lookup) -> dict | None:
)

try:
gdal_path = encode_url_for_gdal(path_item)
item = rio_stac.stac.create_stac_item(
gdal_path,
id=item_id,
asset_media_type=media_type,
asset_name='image',
asset_href=href_item,
with_proj=True,
collection=collection.id,
collection_url=PATH_S3_JSON,
asset_roles=["data"]
)
# Cache hit: build from metadata (no remote read)
if check.get("epsg") is not None:
item = item_create_from_cache(
url=path_item,
item_id=item_id,
metadata=check,
collection_id=collection.id,
collection_url=PATH_S3_JSON,
media_type=media_type,
item_datetime=item_time,
)
else:
# Cache miss: fall back to rio_stac (remote read)
gdal_path = encode_url_for_gdal(path_item)
item = rio_stac.stac.create_stac_item(
gdal_path,
id=item_id,
asset_media_type=media_type,
asset_name='image',
asset_href=href_item,
with_proj=True,
collection=collection.id,
collection_url=PATH_S3_JSON,
asset_roles=["data"]
)
item.assets['image'].href = href_item

item.datetime = item_time
item.assets['image'].href = href_item

# Flag items with unknown datetime for future improvement
if datetime_is_unknown:
item.properties["datetime_unknown"] = True

Expand Down Expand Up @@ -125,10 +139,15 @@ def main():
return 1

df_all = pd.read_csv(PATH_RESULTS_CSV)
results_lookup = {
fix_url(row["url"]): {"is_geotiff": row["is_geotiff"], "is_cog": row["is_cog"]}
for _, row in df_all.iterrows()
}
results_lookup = {}
for _, row in df_all.iterrows():
entry = {"is_geotiff": row["is_geotiff"], "is_cog": row["is_cog"]}
for col in ["epsg", "height", "width", "transform", "bounds"]:
if col in row and pd.notna(row[col]):
entry[col] = int(row[col]) if col in ("epsg", "height", "width") else row[col]
else:
entry[col] = None
results_lookup[fix_url(row["url"])] = entry
print(f"✓ Loaded {len(results_lookup)} validation results")
print()

Expand Down
Loading