Conversation
…rs API - Modified disasters_utils.py to extract EVENT metadata from GeoTIFF file attributes - Added case-insensitive metadata reading for GeoTIFF tags - Added extraction for providers, hazard, and location from S3 mappings - Updated s3_discovery.py to filter None values before STAC ingestion - Added support for disasters:add_providers config flag - Created AWS DAGs variables configuration for disasters environment - Updated STAC ingestor API URL to disasters endpoint
144a034 to
f837ed9
Compare
| @@ -0,0 +1,81 @@ | |||
| # AWS DAGs Variables Configuration | |||
There was a problem hiding this comment.
the aws_dags_variables in secrets manager is generated during the deployment of self managed airflow using values from the deployment secret. We do not want to edit aws_dags_variables directly but I also am admittedly having a hard time finding the definitions for the deployment envs that we can and should modify (https://github.com/NASA-IMPACT/self-managed-apache-airflow/blob/e67e48baad16674e5f14a14cbf55290c36304ccd/infrastructure/secrets/main.tf#L56)
I found some base docs in veda-deploy but we need more clear definitions of how to update deployment envs (I think we lost them in a refactor) https://github.com/NASA-IMPACT/veda-deploy?tab=readme-ov-file#aws-secrets-requirements-for-sm2a
Let's work to improve the deployment env definitions with some of the definitions you have here but not overwriting the terraformed secret.
| ```bash | ||
| aws secretsmanager create-secret \ | ||
| --name veda-pipeline-dev/airflow/variables/aws_dags_variables \ | ||
| --secret-string file://aws_dags_variables.json \ |
There was a problem hiding this comment.
This flag will not work like this - it doesn't unpack the file contents into valid json
| @lru_cache(maxsize=1) | ||
| def _get_country_codes_mapping() -> Dict[str, List[str]]: | ||
| """Load and cache country codes mapping from S3.""" | ||
| data = _load_lut("monty_country_codes.json") |
There was a problem hiding this comment.
I think there's a risk with having LUTs stored as local-to-the-worker files - any changes would have to go through the PR -> review -> merge -> deploy process. It might fit the use case better for these to be managed as s3 files with Airflow Variables as pointers. Something like this:
{
"event_hazard_location": "s3://veda-tf-state-shared-disasters/reference/event-hazard-location.json",
"country_codes": "s3://veda-tf-state-shared-disasters/reference/monty_country_codes.json",
"hazard_codes": "s3://veda-tf-state-shared-disasters/reference/monty_hazard_codes.json"
}
lut_config = Variable.get("disasters_lut_paths", deserialize_json=True)
event_mapping = _load_lut_from_s3(lut_config["event_hazard_location"])
...
Changes could be made in s3 and automatically applied to new tasks.
There was a problem hiding this comment.
I actually asked to move these tables into git in order to transport them between instances while testing and to move them out of the tf state shared bucket because I didn't think it made sense to re-use that bucket. However, I just had another look and I see that we are already reusing it for scheduling collection discovery events so that ship has sailed. So perhaps formalizing a reference directory in the tf shared state bucket does make sense.
I do think we need to provide documentation for where those files should be stored and probably make an informative error message for when the intended files are not found so we don't think we are having an iam/bucket permissions issue.
| return {"monty:hazard_codes": [hazard["glide_code"], hazard["classification_code"]] if hazard else None} | ||
|
|
||
|
|
||
| def extract_hazard_and_location_from_geotiff(file_path: str) -> dict: |
There was a problem hiding this comment.
This and other functions aren't called anywhere - we should remove them if there isn't a plan for them
extract_country_codes_from_geotiff
extract_hazard_codes_from_geotiff
extract_hazard_and_location_from_geotiff
extract_corr_id_from_geotiff
|
|
||
| def group_by_item(discovered_files: List[str], id_regex: str, assets: dict) -> dict: | ||
| """Group assets by matching regex patterns against discovered files.""" | ||
| def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extract_event_name: bool = False, extract_monty: bool = False, add_product: bool = False, add_providers: bool = False) -> dict: |
There was a problem hiding this comment.
There's a lot of disasters-specific flags here (and in other signatures in this file). Would it make more sense to have these moved to another task that looks like this:
@task
def enrich_disaster_metadata(discovery_result: dict, extractors: list[str]) -> dict:
for item in discovery_result["objects"]:
file_path = next(iter(item["assets"].values()))["href"]
if "monty" in extractors:
item["properties"].update(extract_all_metadata_from_geotiff(file_path))
# ...
return discovery_result
This post-processing task could be included in a disasters-specific DAG, or could be a future home for other post-discovery metadata enrichment.
Summary:
Addresses VEDA-XX: Develop amazing new feature
Changes
PR Checklist
terraform validateandterraform plan