|
| 1 | +import os |
| 2 | +import json |
| 3 | +from urllib.parse import urlparse |
| 4 | +import posixpath |
| 5 | +import requests |
| 6 | +from mdps_ds_lib.lib.aws.aws_message_transformers import AwsMessageTransformers |
| 7 | +from pystac import Item, Catalog, Collection |
| 8 | + |
| 9 | +from mdps_ds_lib.lib.aws.aws_param_store import AwsParamStore |
| 10 | +from mdps_ds_lib.lib.aws.aws_s3 import AwsS3 |
| 11 | +from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator |
| 12 | + |
| 13 | +LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env()) |
| 14 | + |
| 15 | + |
| 16 | +class CatalyaArchiveTrigger: |
| 17 | + @staticmethod |
| 18 | + def join_s3_url(base_url: str, relative_path: str) -> str: |
| 19 | + """ |
| 20 | + Join a base S3 URL with a relative path, properly handling '.', '..', and multiple levels. |
| 21 | +
|
| 22 | + Examples: |
| 23 | + join_s3_url('s3://bucket/a/b/c/d', '../../data/abc.json') -> 's3://bucket/a/b/data/abc.json' |
| 24 | + join_s3_url('s3://bucket/a/b/c/d', './file.json') -> 's3://bucket/a/b/c/file.json' |
| 25 | + join_s3_url('s3://bucket/a/b/c/d', '../../../file.json') -> 's3://bucket/a/file.json' |
| 26 | +
|
| 27 | + :param base_url: Base S3 URL (e.g., 's3://bucket/path/to/dir') or local path |
| 28 | + :param relative_path: Relative path to join (e.g., '../../data/file.json', './file.json') |
| 29 | + :return: Joined and normalized S3 URL or local path |
| 30 | + """ |
| 31 | + if base_url.startswith('s3://'): |
| 32 | + # Parse the S3 URL |
| 33 | + parsed = urlparse(base_url) |
| 34 | + bucket = parsed.netloc |
| 35 | + path = parsed.path |
| 36 | + |
| 37 | + # Use posixpath.join to combine paths, then normpath to resolve .. and . |
| 38 | + joined_path = posixpath.join(path, relative_path) |
| 39 | + normalized_path = posixpath.normpath(joined_path) |
| 40 | + |
| 41 | + # Reconstruct the S3 URL |
| 42 | + return f's3://{bucket}{normalized_path}' |
| 43 | + else: |
| 44 | + # For local paths, use os.path |
| 45 | + joined_path = os.path.join(base_url, relative_path) |
| 46 | + return os.path.normpath(joined_path) |
| 47 | + |
| 48 | + def __init__(self): |
| 49 | + self.__s3 = AwsS3() |
| 50 | + self.__ssm = AwsParamStore() |
| 51 | + self.__uds_api_creds_key = os.getenv('UDS_API_CREDS', '') |
| 52 | + |
| 53 | + def retrieve_all_stac_items(self, stac_catalog: dict, catalog_s3_url: str): |
| 54 | + catalog_dir = os.path.dirname(catalog_s3_url) |
| 55 | + LOGGER.debug(f"catalog S3 Dir: {catalog_dir}") |
| 56 | + |
| 57 | + catalog = Catalog.from_dict(stac_catalog) |
| 58 | + LOGGER.info(f"Successfully parsed STAC catalog: {catalog.id}") |
| 59 | + |
| 60 | + # Extract STAC Items from the catalog (including items from collections) |
| 61 | + item_links = [] |
| 62 | + |
| 63 | + # Get all child links from catalog |
| 64 | + all_links = [k for k in catalog.get_links() if k.rel in ['item', 'child', 'collection'] and not k.target.startswith('http')] |
| 65 | + LOGGER.info(f"Found {len(all_links)} total eligible links in catalog") |
| 66 | + for each in all_links: |
| 67 | + if os.path.isabs(each.target): |
| 68 | + continue |
| 69 | + each.target = self.join_s3_url(catalog_dir, each.target) |
| 70 | + |
| 71 | + for link in all_links: |
| 72 | + # Check if link exists locally |
| 73 | + b, p = self.__s3.split_s3_url(link.target) |
| 74 | + if not self.__s3.exists(b, p): |
| 75 | + LOGGER.warning(f"Local link file not found: {link.target}") |
| 76 | + continue |
| 77 | + |
| 78 | + # Handle different link types |
| 79 | + if link.rel == 'item': |
| 80 | + # Direct item link |
| 81 | + item_links.append(link.target) |
| 82 | + LOGGER.info(f"Found item link: {link.target}") |
| 83 | + elif link.rel == 'child' or link.rel == 'collection': |
| 84 | + # Collection link - read collection and extract items |
| 85 | + try: |
| 86 | + collection = self.__s3.set_s3_url(link.target).read_small_txt_file() |
| 87 | + collection = Collection.from_dict(collection) |
| 88 | + collection_item_links = list(collection.get_item_links()) |
| 89 | + collection_folder = os.path.dirname(link.target) |
| 90 | + temp_item_links = [k.target for k in collection.get_links() if |
| 91 | + k.rel in ['item'] and not k.target.startswith('http')] |
| 92 | + for each in temp_item_links: |
| 93 | + if os.path.isabs(each): |
| 94 | + item_links.append(each) |
| 95 | + else: |
| 96 | + item_links.append(self.join_s3_url(collection_folder, each)) |
| 97 | + LOGGER.info( |
| 98 | + f"Found collection '{collection.id}' with {len(collection_item_links)} items: {link.target}") |
| 99 | + except Exception as e: |
| 100 | + LOGGER.warning(f"Failed to process collection link '{link.target}': {str(e)}") |
| 101 | + continue |
| 102 | + else: |
| 103 | + # Other link types - log and ignore |
| 104 | + LOGGER.info(f"Ignoring link of type '{link.rel}': {link.target}") |
| 105 | + return list(set(item_links)) |
| 106 | + |
| 107 | + |
| 108 | + def retrieve_items(self, item_urls: list): |
| 109 | + """ |
| 110 | + Process a list of STAC item S3 URLs by downloading, parsing, and updating asset URLs. |
| 111 | +
|
| 112 | + :param item_urls: List of S3 URLs (as strings or link objects with .target attribute) pointing to STAC item JSON files |
| 113 | + :return: Dictionary mapping S3 URL to processed STAC item dictionary |
| 114 | + """ |
| 115 | + processed_items = {} |
| 116 | + |
| 117 | + for item_url_obj in item_urls: |
| 118 | + # Handle both string URLs and link objects |
| 119 | + item_s3_url = item_url_obj.target if hasattr(item_url_obj, 'target') else item_url_obj |
| 120 | + |
| 121 | + try: |
| 122 | + LOGGER.info(f'Processing STAC item: {item_s3_url}') |
| 123 | + |
| 124 | + # Download and parse STAC item |
| 125 | + item_content = self.__s3.set_s3_url(item_s3_url).read_small_txt_file() |
| 126 | + item_dict = json.loads(item_content) |
| 127 | + |
| 128 | + # Convert to pystac Item object |
| 129 | + stac_item = Item.from_dict(item_dict) |
| 130 | + if stac_item.collection_id is None or stac_item.collection_id == '': |
| 131 | + LOGGER.warning(f'Missing collection_id for {item_s3_url}, skipping') |
| 132 | + continue |
| 133 | + |
| 134 | + granule_id = stac_item.id |
| 135 | + LOGGER.debug(f'Downloaded STAC item: {granule_id}') |
| 136 | + |
| 137 | + # Convert relative asset URLs to absolute S3 URLs and verify they exist |
| 138 | + parsed_item_url = urlparse(item_s3_url) |
| 139 | + item_bucket = parsed_item_url.netloc |
| 140 | + item_path_parts = parsed_item_url.path.rsplit('/', 1) |
| 141 | + item_base_path = item_path_parts[0] if len(item_path_parts) > 1 else '' |
| 142 | + |
| 143 | + s3_base_path = f's3://{item_bucket}{item_base_path}' |
| 144 | + for asset_key, asset in stac_item.assets.items(): |
| 145 | + asset_href = asset.href |
| 146 | + |
| 147 | + # If href is relative, convert to absolute S3 URL |
| 148 | + if not asset_href.startswith('s3://') and not asset_href.startswith('http'): |
| 149 | + # Remove leading ./ or / |
| 150 | + absolute_s3_url = self.join_s3_url(s3_base_path, asset_href) |
| 151 | + LOGGER.debug(f'Converted relative URL {asset.href} to absolute: {absolute_s3_url}') |
| 152 | + asset.href = absolute_s3_url |
| 153 | + |
| 154 | + # Verify the S3 URL exists |
| 155 | + if asset.href.startswith('s3://'): |
| 156 | + bucket, path = self.__s3.split_s3_url(asset.href) |
| 157 | + if not self.__s3.exists(bucket, path): |
| 158 | + raise FileNotFoundError(f'Asset does not exist at S3 URL: {asset.href}') |
| 159 | + LOGGER.debug(f'Verified asset exists: {asset.href}') |
| 160 | + |
| 161 | + # Store the updated item dictionary |
| 162 | + processed_items[item_s3_url] = stac_item.to_dict() |
| 163 | + LOGGER.info(f'Successfully processed STAC item: {granule_id}') |
| 164 | + |
| 165 | + except Exception as e: |
| 166 | + LOGGER.exception(f'Error processing STAC item {item_s3_url}: {str(e)}') |
| 167 | + raise |
| 168 | + |
| 169 | + LOGGER.info(f'Processed {len(processed_items)} STAC items') |
| 170 | + return processed_items |
| 171 | + |
| 172 | + def start_with_event(self, event: dict): |
| 173 | + result = AwsMessageTransformers().sqs_sns(event) |
| 174 | + result1 = AwsMessageTransformers().get_s3_from_sns(result) |
| 175 | + return self.start(f's3://{result1["bucket"]}/{result1["key"]}') |
| 176 | + |
| 177 | + def start(self, catalog_s3_url): |
| 178 | + """ |
| 179 | + Steps: |
| 180 | + 1. You will be given an S3 URL. |
| 181 | + Make sure it is an S3 URL. |
| 182 | + 2. Download that catalog.json |
| 183 | + 3. Call retrieve_all_stac_items with the downloaded dictionary. |
| 184 | + 4. How they are retrieved will be abstracted. |
| 185 | + The return will be a set or dictionary of item S3 URLs |
| 186 | + 5. For each STAC item, download them, and convert them to a STAC Item object. |
| 187 | + 6. for each asset, they will be relative URLs. |
| 188 | + Convert them to S3 URL based on the item.json S3 URL where current folder is <bucket>/<path>/item.json |
| 189 | + Ensure those S3 URLs exist along the way. |
| 190 | + Throw an exception to quit for now.. we'll revisit later. |
| 191 | + 7. from ssm, retrieve __uds_api_creds_key. |
| 192 | + 8. For each, use the UDS_API_CRED to call this method |
| 193 | + @router.put("/{collection_id}/verbose_archive/{granule_id}") |
| 194 | + Check out the file /cumulus_lambda_functions/catalya_uds_api/granules_archive_api.py for more details. |
| 195 | + Do it in a serial fashion now. |
| 196 | + :param catalog_s3_url: |
| 197 | + :return: |
| 198 | + """ |
| 199 | + # Step 1: Validate S3 URL |
| 200 | + LOGGER.info(f'Starting catalog archive trigger for: {catalog_s3_url}') |
| 201 | + if not catalog_s3_url or not catalog_s3_url.startswith('s3://'): |
| 202 | + raise ValueError(f'Invalid S3 URL: {catalog_s3_url}. Must start with s3://') |
| 203 | + |
| 204 | + parsed_url = urlparse(catalog_s3_url) |
| 205 | + if not parsed_url.netloc or not parsed_url.path: |
| 206 | + raise ValueError(f'Invalid S3 URL format: {catalog_s3_url}. Expected format: s3://<bucket>/<path>') |
| 207 | + |
| 208 | + # Step 2: Download catalog.json |
| 209 | + LOGGER.info(f'Downloading catalog from: {catalog_s3_url}') |
| 210 | + catalog_content = self.__s3.set_s3_url(catalog_s3_url).read_small_txt_file() |
| 211 | + catalog_dict = json.loads(catalog_content) |
| 212 | + LOGGER.debug(f'Catalog downloaded successfully') |
| 213 | + |
| 214 | + # Step 3: Retrieve all STAC items |
| 215 | + LOGGER.info('Retrieving all STAC items from catalog') |
| 216 | + item_s3_urls = self.retrieve_all_stac_items(catalog_dict, catalog_s3_url) |
| 217 | + LOGGER.info(f'Found {len(item_s3_urls)} STAC items to process') |
| 218 | + |
| 219 | + # Step 4-6: Process all items (download, parse, update assets) |
| 220 | + LOGGER.info('Processing and validating all STAC items') |
| 221 | + processed_items = self.retrieve_items(item_s3_urls) |
| 222 | + LOGGER.info(f'Successfully processed {len(processed_items)} STAC items') |
| 223 | + |
| 224 | + # Step 7: Retrieve UDS API credentials from SSM (do this once before loop) |
| 225 | + LOGGER.info(f'Retrieving UDS API credentials from SSM: {self.__uds_api_creds_key}') |
| 226 | + if not self.__uds_api_creds_key: |
| 227 | + raise ValueError('UDS_API_CREDS environment variable not set') |
| 228 | + |
| 229 | + uds_api_creds_str = self.__ssm.get_param(self.__uds_api_creds_key) |
| 230 | + uds_api_creds = json.loads(uds_api_creds_str) |
| 231 | + |
| 232 | + # Extract API base URL and bearer token |
| 233 | + api_base_url = uds_api_creds.get('API_BASE_URL', '').rstrip('/') |
| 234 | + bearer_token = uds_api_creds.get('BEARER_TOKEN', '') |
| 235 | + |
| 236 | + if not api_base_url or not bearer_token: |
| 237 | + raise ValueError('UDS API credentials must contain api_base_url and bearer_token') |
| 238 | + |
| 239 | + LOGGER.info(f'API base URL: {api_base_url}') |
| 240 | + |
| 241 | + # Step 8: Trigger archive API requests one by one |
| 242 | + LOGGER.info('Triggering archive requests for all processed items') |
| 243 | + for item_s3_url, item_dict in processed_items.items(): |
| 244 | + try: |
| 245 | + collection_id = item_dict.get('collection') |
| 246 | + granule_id = item_dict.get('id') |
| 247 | + |
| 248 | + if not collection_id or not granule_id: |
| 249 | + LOGGER.error(f'Missing collection_id or granule_id in item: {item_s3_url}') |
| 250 | + raise ValueError(f'Invalid STAC item missing collection or id: {item_s3_url}') |
| 251 | + |
| 252 | + LOGGER.info(f'Triggering archive for granule: {granule_id} from collection: {collection_id}') |
| 253 | + |
| 254 | + # Call the UDS API verbose_archive endpoint |
| 255 | + api_url = f'{api_base_url}/collections/{collection_id}/verbose_archive/{granule_id}' |
| 256 | + headers = { |
| 257 | + 'Authorization': bearer_token, |
| 258 | + 'Content-Type': 'application/json' |
| 259 | + } |
| 260 | + params = { |
| 261 | + 'item_s3_url': item_s3_url |
| 262 | + } |
| 263 | + |
| 264 | + LOGGER.info(f'Calling archive API: PUT {api_url}') |
| 265 | + response = requests.put(api_url, headers=headers, params=params, json=item_dict, timeout=30) |
| 266 | + response.raise_for_status() |
| 267 | + |
| 268 | + LOGGER.info(f'Successfully triggered archive for granule {granule_id}: {response.json()}') |
| 269 | + |
| 270 | + except Exception as e: |
| 271 | + LOGGER.exception(f'Error triggering archive for item {item_s3_url}: {str(e)}') |
| 272 | + raise |
| 273 | + |
| 274 | + LOGGER.info(f'Completed triggering archive for all {len(processed_items)} STAC items') |
| 275 | + return |
0 commit comments