diff --git a/CHANGELOG.md b/CHANGELOG.md index 7262a1c..bef2003 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] (post 4.2.0 release) ### Added +- Support for new OGC endpoints ### Changed ### Deprecated ### Removed +- Removed functions calling old WPST endpoints ### Fixed ### Security @@ -17,7 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added ### Changed - listJobs no longer takes username as an argument, you can only list jobs for the current `MAAP_PGT` token user -- submitJob gets the username from the `MAAP_PGT` token and not username being submitted as an argument +- submit_job gets the username from the `MAAP_PGT` token and not username being submitted as an argument ### Deprecated ### Removed ### Fixed diff --git a/README.md b/README.md index 2e840ad..189036c 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ $ python >>> from maap.maap import MAAP >>> maap = MAAP() ->>> granules = maap.searchGranule(sitename='lope', instrument='uavsar') +>>> granules = maap.search_granule(sitename='lope', instrument='uavsar') >>> for res in granules: print(res.getDownloadUrl()) res.download() @@ -64,17 +64,19 @@ where: With named attribute parameters, this query: ```python -lidarGranule = maap.searchGranule(instrument='lvis', attribute='string,Site Name,lope') +lidarGranule = maap.search_granule(instrument='lvis', attribute='string,Site Name,lope') ``` Simplifies to: ```python -lidarGranule = maap.searchGranule(instrument='lvis', site_name='lope') +lidarGranule = maap.search_granule(instrument='lvis', site_name='lope') ``` ## Test ```bash -python setup.py test +poetry install +poetry run pytest --cov=maap +poetry run pytest test/specific_test.py ``` diff --git a/docs/api/dps.md b/docs/api/dps.md index 3a30b6e..33ece6c 100644 --- a/docs/api/dps.md +++ b/docs/api/dps.md @@ -32,7 +32,7 @@ from maap.maap import MAAP maap = MAAP() # Submit a job -job = maap.submitJob( +job = maap.submit_job( identifier='my_analysis', algo_id='my_algorithm', version='main', diff --git a/docs/api/maap.md b/docs/api/maap.md index 5b06c20..2f28189 100644 --- a/docs/api/maap.md +++ b/docs/api/maap.md @@ -19,13 +19,13 @@ from maap.maap import MAAP maap = MAAP() # Search granules -granules = maap.searchGranule(short_name='GEDI02_A', limit=10) +granules = maap.search_granule(short_name='GEDI02_A', limit=10) # Search collections -collections = maap.searchCollection(provider='MAAP') +collections = maap.search_collection(provider='MAAP') # Submit a job -job = maap.submitJob( +job = maap.submit_job( identifier='analysis', algo_id='my_algo', version='main', diff --git a/docs/api/result.md b/docs/api/result.md index 9cbafa3..6a677c2 100644 --- a/docs/api/result.md +++ b/docs/api/result.md @@ -49,7 +49,7 @@ from maap.maap import MAAP maap = MAAP() # Search granules -granules = maap.searchGranule(short_name='GEDI02_A', limit=5) +granules = maap.search_granule(short_name='GEDI02_A', limit=5) for granule in granules: # Get URLs diff --git a/docs/index.md b/docs/index.md index c1bc030..402a72c 100644 --- a/docs/index.md +++ b/docs/index.md @@ -30,7 +30,7 @@ from maap.maap import MAAP maap = MAAP() # Search for granules -granules = maap.searchGranule( +granules = maap.search_granule( short_name='GEDI02_A', bounding_box='-122.5,37.5,-121.5,38.5', limit=10 @@ -42,7 +42,7 @@ for granule in granules: print(f"Downloaded: {local_path}") # Submit a job -job = maap.submitJob( +job = maap.submit_job( identifier='my_analysis', algo_id='my_algorithm', version='main', diff --git a/examples/BrowseExample.ipynb b/examples/BrowseExample.ipynb index a18d022..6f0ca7e 100644 --- a/examples/BrowseExample.ipynb +++ b/examples/BrowseExample.ipynb @@ -386,7 +386,7 @@ } ], "source": [ - "granule = maap.searchGranule(granule_ur='uavsar_AfriSAR_v1_SLC-lopenp_14043_16015_001_160308_L090.vrt')[0]\n", + "granule = maap.search_granule(granule_ur='uavsar_AfriSAR_v1_SLC-lopenp_14043_16015_001_160308_L090.vrt')[0]\n", "maap.show(granule)" ] }, @@ -747,7 +747,7 @@ } ], "source": [ - "granule = maap.searchGranule(granule_ur='ILVIS2_GA2016_0220_R1611_038024')[0]\n", + "granule = maap.search_granule(granule_ur='ILVIS2_GA2016_0220_R1611_038024')[0]\n", "maap.show(granule)" ] } diff --git a/examples/Search Collection - Basics-checkpoint.ipynb b/examples/Search Collection - Basics-checkpoint.ipynb index 49f3ddf..544c247 100644 --- a/examples/Search Collection - Basics-checkpoint.ipynb +++ b/examples/Search Collection - Basics-checkpoint.ipynb @@ -53,7 +53,7 @@ "metadata": {}, "outputs": [], "source": [ - "results = maap.searchCollection(keyword='precipitation')" + "results = maap.search_collection(keyword='precipitation')" ] }, { diff --git a/examples/Search Granule-checkpoint.ipynb b/examples/Search Granule-checkpoint.ipynb index 11c7f47..d3eb1bd 100644 --- a/examples/Search Granule-checkpoint.ipynb +++ b/examples/Search Granule-checkpoint.ipynb @@ -63,7 +63,7 @@ } ], "source": [ - "results = maap.searchCollection(keyword='land',data_center='modaps')\n", + "results = maap.search_collection(keyword='land',data_center='modaps')\n", "print(len(results))\n", "\n", "for res in results:\n", @@ -159,7 +159,7 @@ } ], "source": [ - "results = maap.searchGranule(limit=10,short_name=\"MOD11A1\")\n", + "results = maap.search_granule(limit=10,short_name=\"MOD11A1\")\n", "\n", "print(len(results))\n", "for res in results:\n", diff --git a/maap/AWS.py b/maap/AWS.py index 3803c84..fe2a53f 100644 --- a/maap/AWS.py +++ b/maap/AWS.py @@ -336,7 +336,7 @@ def workspace_bucket_credentials(self): See Also -------- :meth:`requester_pays_credentials` : For accessing external data - :meth:`maap.maap.MAAP.uploadFiles` : Upload files to shared storage + :meth:`maap.maap.MAAP.upload_files` : Upload files to shared storage """ headers = self._api_header headers["Accept"] = "application/json" diff --git a/maap/Profile.py b/maap/Profile.py index 3bc2ce5..eb3a62d 100644 --- a/maap/Profile.py +++ b/maap/Profile.py @@ -100,7 +100,7 @@ def account_info(self, proxy_ticket=None): Notes ----- - This method is used internally by :meth:`~maap.maap.MAAP.submitJob` + This method is used internally by :meth:`~maap.maap.MAAP.submit_job` to automatically include the username with job submissions. See Also diff --git a/maap/Result.py b/maap/Result.py index 2f96012..39e4abb 100644 --- a/maap/Result.py +++ b/maap/Result.py @@ -21,7 +21,7 @@ from maap.maap import MAAP maap = MAAP() - granules = maap.searchGranule(short_name='GEDI02_A', limit=5) + granules = maap.search_granule(short_name='GEDI02_A', limit=5) for granule in granules: # Get download URLs @@ -439,7 +439,7 @@ class Collection(Result): -------- Search for collections:: - >>> collections = maap.searchCollection(short_name='GEDI02_A') + >>> collections = maap.search_collection(short_name='GEDI02_A') >>> for c in collections: ... print(c['Collection']['ShortName']) ... print(c['Collection']['Description']) @@ -458,7 +458,7 @@ class Collection(Result): See Also -------- :class:`Granule` : Individual data file results - :meth:`maap.maap.MAAP.searchCollection` : Search for collections + :meth:`maap.maap.MAAP.search_collection` : Search for collections """ def __init__(self, metaResult, maap_host): @@ -512,7 +512,7 @@ class Granule(Result): -------- Search and access granule metadata:: - >>> granules = maap.searchGranule(short_name='GEDI02_A', limit=5) + >>> granules = maap.search_granule(short_name='GEDI02_A', limit=5) >>> granule = granules[0] >>> print(granule['Granule']['GranuleUR']) @@ -540,7 +540,7 @@ class Granule(Result): See Also -------- :class:`Collection` : Dataset metadata results - :meth:`maap.maap.MAAP.searchGranule` : Search for granules + :meth:`maap.maap.MAAP.search_granule` : Search for granules """ def __init__( diff --git a/maap/__init__.py b/maap/__init__.py index a436ba3..adf9538 100644 --- a/maap/__init__.py +++ b/maap/__init__.py @@ -24,7 +24,7 @@ maap = MAAP() # Search for granules - granules = maap.searchGranule( + granules = maap.search_granule( short_name='GEDI02_A', limit=10 ) @@ -34,7 +34,7 @@ local_path = granule.getData(destpath='/tmp') # Submit a job - job = maap.submitJob( + job = maap.submit_job( identifier='my_job', algo_id='my_algorithm', version='main', diff --git a/maap/config_reader.py b/maap/config_reader.py index e2f82e3..5e6423e 100644 --- a/maap/config_reader.py +++ b/maap/config_reader.py @@ -261,6 +261,9 @@ def __init__(self, maap_host): self.algorithm_build = self._get_api_endpoint("algorithm_build") self.mas_algo = self._get_api_endpoint("mas_algo") self.dps_job = self._get_api_endpoint("dps_job") + self.processes_ogc = self._get_api_endpoint("processes_ogc") + self.deployment_jobs_ogc = self._get_api_endpoint("deployment_jobs_ogc") + self.jobs_ogc = self._get_api_endpoint("jobs_ogc") self.member_dps_token = self._get_api_endpoint("member_dps_token") self.requester_pays = self._get_api_endpoint("requester_pays") self.edc_credentials = self._get_api_endpoint("edc_credentials") diff --git a/maap/dps/dps_job.py b/maap/dps/dps_job.py index 6c395dd..47d7255 100644 --- a/maap/dps/dps_job.py +++ b/maap/dps/dps_job.py @@ -21,7 +21,7 @@ maap = MAAP() # Submit a job - job = maap.submitJob( + job = maap.submit_job( identifier='my_analysis', algo_id='my_algorithm', version='main', @@ -38,13 +38,14 @@ See Also -------- -:meth:`maap.maap.MAAP.submitJob` : Submit a new job -:meth:`maap.maap.MAAP.getJob` : Retrieve an existing job +:meth:`maap.maap.MAAP.submit_job` : Submit a new job +:meth:`maap.maap.MAAP.get_job` : Retrieve an existing job """ import json import logging import os +import time import xml.etree.ElementTree as ET import backoff from urllib.parse import urljoin @@ -131,12 +132,12 @@ class DPSJob: -------- Get job status:: - >>> job = maap.getJob('f3780917-92c0-4440-8a84-9b28c2e64fa8') + >>> job = maap.get_job('f3780917-92c0-4440-8a84-9b28c2e64fa8') >>> print(f"Status: {job.status}") Wait for completion:: - >>> job = maap.submitJob(...) + >>> job = maap.submit_job(...) >>> job.wait_for_completion() >>> print(f"Final status: {job.status}") @@ -159,7 +160,7 @@ class DPSJob: See Also -------- - :meth:`maap.maap.MAAP.submitJob` : Submit new jobs + :meth:`maap.maap.MAAP.submit_job` : Submit new jobs :meth:`maap.maap.MAAP.listJobs` : List all jobs """ @@ -227,13 +228,20 @@ def retrieve_status(self): return self.status @backoff.on_exception(backoff.expo, Exception, max_value=64, max_time=172800) - def wait_for_completion(self): + def wait_for_completion(self, initial_delay_seconds=5): """ Wait for the job to complete. Blocks execution until the job finishes (succeeds, fails, or is cancelled). Uses exponential backoff to poll for status updates. + Parameters + ---------- + initial_delay_seconds : float, optional + Initial delay in seconds before first status check (default: 5). + This accounts for the time required for newly submitted jobs to + become visible in the database. Set to 0 to disable initial delay. + Returns ------- DPSJob @@ -243,7 +251,7 @@ def wait_for_completion(self): -------- :: - >>> job = maap.submitJob(...) + >>> job = maap.submit_job(...) >>> job.wait_for_completion() >>> if job.status == 'Succeeded': ... print("Job completed successfully!") @@ -254,14 +262,22 @@ def wait_for_completion(self): - Uses exponential backoff with max interval of 64 seconds - Maximum wait time is 48 hours (172800 seconds) - The job object is updated with final status upon completion + - Initial delay accounts for database visibility delay (~5 seconds) See Also -------- :meth:`retrieve_status` : Check status without blocking :meth:`cancel_job` : Cancel a running job """ + # Wait before first check to allow newly submitted jobs to appear in database + if initial_delay_seconds > 0: + logger.debug(f'Waiting {initial_delay_seconds} seconds before first status check') + time.sleep(initial_delay_seconds) + self.retrieve_status() - if self.status.lower() in ["accepted", "running"]: + # Known terminal states and states that should trigger retries + terminal_states = ["succeeded", "failed", "dismissed", "deduped", "offline"] + if self.status.lower() not in terminal_states: logger.debug('Current Status is {}. Backing off.'.format(self.status)) raise RuntimeError return self diff --git a/maap/maap.py b/maap/maap.py index 22a805b..6c1298a 100644 --- a/maap/maap.py +++ b/maap/maap.py @@ -23,7 +23,7 @@ maap = MAAP() # Search for granules - granules = maap.searchGranule( + granules = maap.search_granule( short_name='GEDI02_A', limit=10 ) @@ -111,7 +111,7 @@ class MAAP(object): Search for granules:: - >>> granules = maap.searchGranule( + >>> granules = maap.search_granule( ... short_name='GEDI02_A', ... bounding_box='-122.5,37.5,-121.5,38.5', ... limit=5 @@ -121,7 +121,7 @@ class MAAP(object): Submit a job:: - >>> job = maap.submitJob( + >>> job = maap.submit_job( ... identifier='my_analysis', ... algo_id='my_algorithm', ... version='main', @@ -226,12 +226,32 @@ def _upload_s3(self, filename, bucket, objectKey): Notes ----- - This is an internal method primarily used by :meth:`uploadFiles`. + This is an internal method primarily used by :meth:`upload_files`. It uses the boto3 S3 client configured at module level. """ return s3_client.upload_file(filename, bucket, objectKey) + + def searchGranule(self): + """ + Search for granules in the CMR (Common Metadata Repository). + + Queries the CMR database for granules matching the specified criteria. + Granules represent individual data files within a collection. + + DEPRECATION NOTICE: This method is no longer supported. Use the following instead: + + search_granule(limit=20, **kwargs) + + limit : int, optional + Maximum number of results to return. Default is 20. + **kwargs : dict + Search parameters to filter results. Common parameters include: - def searchGranule(self, limit=20, **kwargs): + """ + raise Exception("searchGranule() is no longer supported. Use search_granule(limit=20, **kwargs) instead.") + + + def search_granule(self, limit=20, **kwargs): """ Search for granules in the CMR (Common Metadata Repository). @@ -275,14 +295,14 @@ def searchGranule(self, limit=20, **kwargs): -------- Search by collection name:: - >>> granules = maap.searchGranule( + >>> granules = maap.search_granule( ... short_name='GEDI02_A', ... limit=10 ... ) Search with spatial bounds:: - >>> granules = maap.searchGranule( + >>> granules = maap.search_granule( ... collection_concept_id='C1234567890-MAAP', ... bounding_box='-122.5,37.5,-121.5,38.5', ... limit=5 @@ -290,7 +310,7 @@ def searchGranule(self, limit=20, **kwargs): Search with temporal filter:: - >>> granules = maap.searchGranule( + >>> granules = maap.search_granule( ... short_name='AFLVIS2', ... temporal='2019-01-01T00:00:00Z,2019-12-31T23:59:59Z', ... limit=100 @@ -298,7 +318,7 @@ def searchGranule(self, limit=20, **kwargs): Search with pattern matching:: - >>> granules = maap.searchGranule( + >>> granules = maap.search_granule( ... readable_granule_name='*2019*', ... short_name='GEDI02_A' ... ) @@ -317,7 +337,7 @@ def searchGranule(self, limit=20, **kwargs): See Also -------- - :meth:`searchCollection` : Search for collections + :meth:`search_collection` : Search for collections :class:`~maap.Result.Granule` : Granule result class """ results = self._CMR.get_search_results(url=self.config.search_granule_url, limit=limit, **kwargs) @@ -327,8 +347,33 @@ def searchGranule(self, limit=20, **kwargs): self.config.search_granule_url, self._get_api_header(), self._DPS) for result in results][:limit] + + + def downloadGranule(self): + """ + Download a granule directly from an HTTP URL. + + Downloads data from an Earthdata HTTP URL, handling both public and + protected (authenticated) resources automatically. + + DEPRECATION NOTICE: This method is no longer supported. Use the following instead: + + download_granule(online_access_url, destination_path=".", overwrite=False) + + online_access_url : str + The HTTP URL of the granule to download. This is typically obtained + from a granule's ``OnlineAccessURL`` field. + destination_path : str, optional + Directory path where the file will be saved. Default is the current + working directory (``'.'``). + overwrite : bool, optional + If ``True``, overwrite existing files. If ``False`` (default), skip + download if the file already exists. + + """ + raise Exception("downloadGranule() is no longer supported. Use download_granule(online_access_url, destination_path=\".\", overwrite=False) instead.") - def downloadGranule(self, online_access_url, destination_path=".", overwrite=False): + def download_granule(self, online_access_url, destination_path=".", overwrite=False): """ Download a granule directly from an HTTP URL. @@ -356,7 +401,7 @@ def downloadGranule(self, online_access_url, destination_path=".", overwrite=Fal -------- Download a granule by URL:: - >>> local_file = maap.downloadGranule( + >>> local_file = maap.download_granule( ... 'https://data.maap-project.org/file/data.h5', ... destination_path='/tmp/downloads' ... ) @@ -364,7 +409,7 @@ def downloadGranule(self, online_access_url, destination_path=".", overwrite=Fal Force overwrite of existing files:: - >>> local_file = maap.downloadGranule( + >>> local_file = maap.download_granule( ... url, ... destination_path='/tmp', ... overwrite=True @@ -383,7 +428,7 @@ def downloadGranule(self, online_access_url, destination_path=".", overwrite=Fal See Also -------- - :meth:`searchGranule` : Search for granules + :meth:`search_granule` : Search for granules :meth:`~maap.Result.Granule.getData` : Download granule data """ filename = os.path.basename(urllib.parse.urlparse(online_access_url).path) @@ -396,8 +441,32 @@ def downloadGranule(self, online_access_url, destination_path=".", overwrite=Fal proxy._apiHeader = self._get_api_header() # noinspection PyProtectedMember return proxy._getHttpData(online_access_url, overwrite, final_destination) + + + def getCallFromEarthdataQuery(self): + """ + Generate a MAAP API call string from an Earthdata search query. + + Converts a JSON-formatted Earthdata search query into a Python code + string that can be used to call the MAAP API. + + DEPRECATION NOTICE: This method is no longer supported. Use the following instead: + + get_call_from_earthdata_query(query, variable_name='maap', limit=1000) + + query : str + A JSON-formatted string representing an Earthdata search query. + This is the format used by the Earthdata Search application. + variable_name : str, optional + The variable name to use in the generated code for the MAAP + client instance. Default is ``'maap'``. + limit : int, optional + Maximum number of records to return. Default is 1000. + + """ + raise Exception("getCallFromEarthdataQuery() is no longer supported. Use get_call_from_earthdata_query(query, variable_name='maap', limit=1000) instead.") - def getCallFromEarthdataQuery(self, query, variable_name='maap', limit=1000): + def get_call_from_earthdata_query(self, query, variable_name='maap', limit=1000): """ Generate a MAAP API call string from an Earthdata search query. @@ -426,9 +495,9 @@ def getCallFromEarthdataQuery(self, query, variable_name='maap', limit=1000): Convert an Earthdata query:: >>> query = '{"instrument_h": ["GEDI"], "bounding_box": "-180,-90,180,90"}' - >>> code = maap.getCallFromEarthdataQuery(query) + >>> code = maap.get_call_from_earthdata_query(query) >>> print(code) - maap.searchGranule(instrument="GEDI", bounding_box="-180,-90,180,90", limit=1000) + maap.search_granule(instrument="GEDI", bounding_box="-180,-90,180,90", limit=1000) Notes ----- @@ -438,12 +507,34 @@ def getCallFromEarthdataQuery(self, query, variable_name='maap', limit=1000): See Also -------- - :meth:`getCallFromCmrUri` : Generate call from CMR URI - :meth:`searchGranule` : Execute a granule search + :meth:`get_call_from_cmr_uri` : Generate call from CMR URI + :meth:`search_granule` : Execute a granule search """ return self._CMR.generateGranuleCallFromEarthDataRequest(query, variable_name, limit) - def getCallFromCmrUri(self, search_url, variable_name='maap', limit=1000, search='granule'): + def getCallFromCmrUri(self): + """ + Generate a MAAP API call string from a CMR REST API URL. + + DEPRECATION NOTICE: This method is no longer supported. Use the following instead: + + get_call_from_cmr_uri(search_url, variable_name='maap', limit=1000, search='granule') + + search_url : str + A CMR REST API search URL. This can be copied directly from + the CMR API or browser address bar. + variable_name : str, optional + The variable name to use in the generated code for the MAAP + client instance. Default is ``'maap'``. + limit : int, optional + Maximum number of records to return. Default is 1000. + search : str, optional + Type of search to perform. Either ``'granule'`` (default) or + ``'collection'``. + """ + raise Exception("getCallFromCmrUri() is no longer supported. Use get_call_from_cmr_uri(search_url, variable_name='maap', limit=1000, search='granule') instead.") + + def get_call_from_cmr_uri(self, search_url, variable_name='maap', limit=1000, search='granule'): """ Generate a MAAP API call string from a CMR REST API URL. @@ -475,16 +566,16 @@ def getCallFromCmrUri(self, search_url, variable_name='maap', limit=1000, search Convert a CMR granule search URL:: >>> url = 'https://cmr.earthdata.nasa.gov/search/granules?short_name=GEDI02_A' - >>> code = maap.getCallFromCmrUri(url) + >>> code = maap.get_call_from_cmr_uri(url) >>> print(code) - maap.searchGranule(short_name="GEDI02_A", limit=1000) + maap.search_granule(short_name="GEDI02_A", limit=1000) Convert a collection search:: >>> url = 'https://cmr.earthdata.nasa.gov/search/collections?provider=MAAP' - >>> code = maap.getCallFromCmrUri(url, search='collection') + >>> code = maap.get_call_from_cmr_uri(url, search='collection') >>> print(code) - maap.searchCollection(provider="MAAP", limit=1000) + maap.search_collection(provider="MAAP", limit=1000) Notes ----- @@ -494,13 +585,45 @@ def getCallFromCmrUri(self, search_url, variable_name='maap', limit=1000, search See Also -------- - :meth:`getCallFromEarthdataQuery` : Generate call from Earthdata query - :meth:`searchGranule` : Execute a granule search - :meth:`searchCollection` : Execute a collection search + :meth:`get_call_from_earthdata_query` : Generate call from Earthdata query + :meth:`search_granule` : Execute a granule search + :meth:`search_collection` : Execute a collection search """ return self._CMR.generateCallFromEarthDataQueryString(search_url, variable_name, limit, search) - def searchCollection(self, limit=100, **kwargs): + def searchCollection(self): + """ + Search for collections in the CMR (Common Metadata Repository). + + DEPRECATION NOTICE: This method is no longer supported. Use the following instead: + + search_collection(limit=100, **kwargs) + + limit : int, optional + Maximum number of results to return. Default is 100. + **kwargs : dict + Search parameters to filter results. Common parameters include: + + short_name : str + Collection short name (e.g., 'GEDI02_A'). + concept_id : str + Unique CMR collection identifier. + provider : str + Data provider (e.g., 'MAAP', 'LPDAAC_ECS'). + keyword : str + Keyword search across collection metadata. + instrument : str + Filter by instrument name. + platform : str + Filter by platform name. + project : str + Filter by project name. + processing_level_id : str + Filter by data processing level. + """ + raise Exception("searchCollection() is no longer supported. Use search_collection(limit=100, **kwargs) instead.") + + def search_collection(self, limit=100, **kwargs): """ Search for collections in the CMR (Common Metadata Repository). @@ -541,20 +664,20 @@ def searchCollection(self, limit=100, **kwargs): -------- Search by short name:: - >>> collections = maap.searchCollection(short_name='GEDI02_A') + >>> collections = maap.search_collection(short_name='GEDI02_A') >>> for c in collections: ... print(c['Collection']['ShortName']) Search by provider:: - >>> collections = maap.searchCollection( + >>> collections = maap.search_collection( ... provider='MAAP', ... limit=50 ... ) Search by keyword:: - >>> collections = maap.searchCollection( + >>> collections = maap.search_collection( ... keyword='biomass forest', ... limit=20 ... ) @@ -562,17 +685,17 @@ def searchCollection(self, limit=100, **kwargs): Notes ----- Collections contain metadata about datasets but not the actual data - files. Use :meth:`searchGranule` to find individual data files within + files. Use :meth:`search_granule` to find individual data files within a collection. See Also -------- - :meth:`searchGranule` : Search for granules within collections + :meth:`search_granule` : Search for granules within collections :class:`~maap.Result.Collection` : Collection result class """ results = self._CMR.get_search_results(url=self.config.search_collection_url, limit=limit, **kwargs) return [Collection(result, self.config.maap_host) for result in results][:limit] - + def getQueues(self): """ Get available DPS processing queues (resources). @@ -581,6 +704,20 @@ def getQueues(self): used for algorithm execution. Different queues provide different amounts of memory and CPU. + DEPRECATION NOTICE: This method is no longer supported. Use the following instead: + + get_queues() + """ + raise Exception("getQueues() is no longer supported. Use get_queues() instead.") + + def get_queues(self): + """ + Get available DPS processing queues (resources). + + Retrieves a list of available compute resources (queues) that can be + used for algorithm execution. Different queues provide different + amounts of memory and CPU. + Returns ------- requests.Response @@ -591,7 +728,7 @@ def getQueues(self): -------- List available queues:: - >>> response = maap.getQueues() + >>> response = maap.get_queues() >>> queues = response.json() >>> for queue in queues: ... print(f"{queue['name']}: {queue['memory']} RAM") @@ -603,8 +740,8 @@ def getQueues(self): See Also -------- - :meth:`submitJob` : Submit a job to a queue - :meth:`registerAlgorithm` : Register an algorithm to run on queues + :meth:`submit_job` : Submit a job to a queue + :meth:`algorithm_register` : Deploy an algorithm to run on queues """ url = os.path.join(self.config.algorithm_register, 'resource') headers = self._get_api_header() @@ -617,973 +754,625 @@ def getQueues(self): ) return response - def registerAlgorithm(self, arg): + def deploy_algorithm_from_cwl_file(self, file_path): """ - Register an algorithm with the MAAP DPS. - - Registers a new algorithm configuration that can be executed on the - MAAP Data Processing System (DPS). + Deploys an algorithm from a CWL file + """ + # Read raw text from CWL file + with open(file_path, 'r') as f: + raw_text = f.read() + process = { + "cwlRawText": raw_text + } + headers = self._get_api_header(content_type='application/json') + logger.debug('POST request sent to {}'.format(self.config.processes_ogc)) + response = requests.post( + url=self.config.processes_ogc, + headers=headers, + json=process + ) + return response - Parameters - ---------- - arg : dict or str - Algorithm configuration as a dictionary or JSON string. Required - fields include: - - algorithm_name : str - Unique name for the algorithm. - code_version : str - Version identifier (e.g., Git branch or tag). - algorithm_description : str - Human-readable description. - docker_container_url : str - URL of the Docker container image. - script_command : str - Command to execute inside the container. - inputs : list of dict - Input parameter definitions with ``field`` and ``download`` keys. Format should be like - {'file': [{'name': 'input_file'}],'config': [{'name': 'config_param'}],'positional': [{'name': 'pos_arg'}]} - repo_url : str - Git repository URL for the algorithm source code. + def replace_algorithm_from_cwl_file(self, process_id, file_path): + """ + Deploys an algorithm from a CWL file + """ + # Read raw text from CWL file + with open(file_path, 'r') as f: + raw_text = f.read() + process = { + "cwlRawText": raw_text + } + url = os.path.join(self.config.processes_ogc, str(process_id)) + headers = self._get_api_header(content_type='application/json') + logger.debug('PUT request sent to {}'.format(url)) + response = requests.put( + url=url, + headers=headers, + json=process + ) + return response - Returns - ------- - requests.Response - HTTP response indicating success or failure of registration. + def uploadFiles(self): + """ + Upload files to MAAP shared storage. - Examples - -------- - Register using a dictionary:: - - >>> config = { - ... 'algorithm_name': 'my_algorithm', - ... 'code_version': 'main', - ... 'algorithm_description': 'Processes satellite data', - ... 'docker_container_url': 'registry/image:tag', - ... 'script_command': 'python run.py', - ... 'inputs': { - ... 'file': [{'name': 'input_file'}], - ... 'config': [{'name': 'config_param'}], - ... 'positional': [{'name': 'pos_arg'}] - ... }, - ... 'repo_url': 'https://github.com/org/repo' - ... } - >>> response = maap.registerAlgorithm(config) - - Register using a JSON string:: - - >>> import json - >>> response = maap.registerAlgorithm(json.dumps(config)) + DEPRECATION NOTICE: This method is no longer supported. Use the following instead: - Notes - ----- - After registration, algorithms need to be built before they can be - executed. The build process creates the Docker image on the DPS - infrastructure. + upload_files(filenames) - See Also - -------- - :meth:`register_algorithm_from_yaml_file` : Register from YAML file - :meth:`listAlgorithms` : List registered algorithms - :meth:`deleteAlgorithm` : Delete an algorithm - """ - logger.debug('Registering algorithm with args ') - if type(arg) is dict: - arg = json.dumps(arg) - logger.debug(arg) - response = requests_utils.make_request(url=self.config.algorithm_register, config=self.config, - content_type='application/json', request_type=requests_utils.POST, - data=arg) - logger.debug('POST request sent to {}'.format(self.config.algorithm_register)) - return response + filenames : list of str + List of local file paths to upload. + """ + raise Exception("uploadFiles() is no longer supported. Use upload_files(filenames) instead.") - def register_algorithm_from_yaml_file(self, file_path): + def upload_files(self, filenames): """ - Register an algorithm from a YAML configuration file. + Upload files to MAAP shared storage. - Reads algorithm configuration from a YAML file and registers it with - the MAAP DPS. + Uploads local files to an S3 staging directory where they can be + accessed by other MAAP users or used as inputs to DPS jobs. Parameters ---------- - file_path : str - Path to the YAML configuration file. + filenames : list of str + List of local file paths to upload. Returns ------- - requests.Response - HTTP response indicating success or failure of registration. + str + A message containing the UUID of the upload directory. This UUID + is needed to share the files with other users. Examples -------- - Register from a YAML file:: - - >>> response = maap.register_algorithm_from_yaml_file('algorithm.yaml') - - Example YAML file structure:: - - algorithm_name: my_algorithm - code_version: main - algorithm_description: Process satellite data - docker_container_url: registry/image:tag - script_command: python run.py - inputs: - file: - - name: input_file - config: - - name: config_param - positional: - - name: pos_arg - repo_url: https://github.com/org/repo - - See Also - -------- - :meth:`registerAlgorithm` : Register from dict or JSON - :meth:`register_algorithm_from_yaml_file_backwards_compatible` : Legacy format - """ - algo_config = algorithm_utils.read_yaml_file(file_path) - return self.registerAlgorithm(algo_config) - - def register_algorithm_from_yaml_file_backwards_compatible(self, file_path): - """ - Register an algorithm from a legacy YAML configuration file. + Upload files to share:: - Reads algorithm configuration from an older YAML format and converts - it to the current format before registration. + >>> result = maap.upload_files(['data.csv', 'config.json']) + >>> print(result) + Upload file subdirectory: a1b2c3d4-e5f6-... (keep a record of...) - Parameters - ---------- - file_path : str - Path to the legacy YAML configuration file. + Upload a single file:: - Returns - ------- - requests.Response - HTTP response indicating success or failure of registration. + >>> result = maap.upload_files(['output.tif']) Notes ----- - This method supports the legacy YAML format with different field names: - - - ``algo_name`` -> ``algorithm_name`` - - ``version`` -> ``code_version`` - - ``environment`` -> ``environment_name`` - - ``description`` -> ``algorithm_description`` - - ``docker_url`` -> ``docker_container_url`` - - ``inputs`` -> ``algorithm_params`` - - ``run_command`` -> ``script_command`` - - ``repository_url`` -> ``repo_url`` + - Files are uploaded to a unique subdirectory identified by a UUID + - Save the UUID to share the upload location with collaborators + - The upload location can be used as input to DPS jobs See Also -------- - :meth:`register_algorithm_from_yaml_file` : Current format - :meth:`registerAlgorithm` : Register from dict - """ - algo_yaml = algorithm_utils.read_yaml_file(file_path) - key_map = {"algo_name": "algorithm_name", "version": "code_version", "environment": "environment_name", - "description": "algorithm_description", "docker_url": "docker_container_url", - "inputs": "algorithm_params", "run_command": "script_command", "repository_url": "repo_url"} - output_config = {} - for key, value in algo_yaml.items(): - if key in key_map: - if key == "inputs": - inputs = [] - for argument in value: - inputs.append({"field": argument.get("name"), "download": argument.get("download")}) - output_config.update({"algorithm_params": inputs}) - else: - output_config.update({key_map.get(key): value}) - else: - output_config.update({key: value}) - logger.debug("Registering with config %s " % json.dumps(output_config)) - return self.registerAlgorithm(json.dumps(output_config)) + :meth:`submit_job` : Use uploaded files as job inputs + """ + bucket = self.config.s3_user_upload_bucket + prefix = self.config.s3_user_upload_dir + uuid_dir = uuid.uuid4() + for filename in filenames: + basename = os.path.basename(filename) + response = self._upload_s3(filename, bucket, f"{prefix}/{uuid_dir}/{basename}") + return f"Upload file subdirectory: {uuid_dir} (keep a record of this if you want to share these files with other users)" - def listAlgorithms(self): + def _get_browse(self, granule_ur): """ - List all registered algorithms. + Get browse image metadata for a granule. + + Internal method to retrieve browse image information for visualization. - Retrieves a list of all algorithms registered by the current user - on the MAAP DPS. + Parameters + ---------- + granule_ur : str + The Granule Universal Reference identifier. Returns ------- requests.Response - HTTP response containing JSON list of algorithms. Each algorithm - entry includes name, version, description, and status information. - - Examples - -------- - List all algorithms:: - - >>> response = maap.listAlgorithms() - >>> algorithms = response.json() - >>> for algo in algorithms: - ... print(f"{algo['algorithm_name']}:{algo['code_version']}") - - See Also - -------- - :meth:`describeAlgorithm` : Get details for specific algorithm - :meth:`registerAlgorithm` : Register a new algorithm - :meth:`deleteAlgorithm` : Delete an algorithm + HTTP response containing browse image metadata. """ - url = self.config.mas_algo - headers = self._get_api_header() - logger.debug('GET request sent to {}'.format(url)) - logger.debug('headers:') - logger.debug(headers) response = requests.get( - url=url, - headers=headers + url=f'{self.config.wmts}/GetTile', + params=dict(granule_ur=granule_ur), + headers=dict(Accept='application/json') ) return response - def describeAlgorithm(self, algoid): + def _get_capabilities(self, granule_ur): """ - Get detailed information about a registered algorithm. + Get WMTS capabilities for a granule. - Retrieves the full configuration and status of a specific algorithm. + Internal method to retrieve Web Map Tile Service capabilities + for visualization. Parameters ---------- - algoid : str - The algorithm identifier, typically in the format - ``algorithm_name:code_version``. + granule_ur : str + The Granule Universal Reference identifier. Returns ------- requests.Response - HTTP response containing JSON with algorithm details including - configuration, build status, and parameter definitions. - - Examples - -------- - Get algorithm details:: - - >>> response = maap.describeAlgorithm('my_algorithm:main') - >>> details = response.json() - >>> print(f"Description: {details['algorithm_description']}") - >>> print(f"Docker: {details['docker_container_url']}") - - See Also - -------- - :meth:`listAlgorithms` : List all algorithms - :meth:`publishAlgorithm` : Publish an algorithm + HTTP response containing WMTS capabilities XML. """ - url = os.path.join(self.config.mas_algo, algoid) - headers = self._get_api_header() - logger.debug('GET request sent to {}'.format(url)) - logger.debug('headers:') - logger.debug(headers) response = requests.get( - url=url, - headers=headers + url=f'{self.config.wmts}/GetCapabilities', + params=dict(granule_ur=granule_ur), + headers=dict(Accept='application/json') ) return response - def publishAlgorithm(self, algoid): + def show(self, granule, display_config={}): """ - Publish an algorithm for public use. + Display a granule on an interactive map. - Makes a registered algorithm available for other MAAP users to - discover and execute. + Renders the granule data as a tile layer on an interactive Mapbox + map in a Jupyter notebook environment. Parameters ---------- - algoid : str - The algorithm identifier to publish, typically in the format - ``algorithm_name:code_version``. + granule : dict + A granule result dictionary, typically obtained from + :meth:`search_granule`. Must contain ``Granule.GranuleUR``. + display_config : dict, optional + Configuration options for rendering. Common options include: - Returns - ------- - requests.Response - HTTP response indicating success or failure of publication. + rescale : str + Value range for color scaling (e.g., ``'0,70'``). + color_map : str + Color palette name (e.g., ``'schwarzwald'``). Examples -------- - Publish an algorithm:: + Display a granule on a map:: + + >>> granules = maap.search_granule(short_name='AFLVIS2', limit=1) + >>> maap.show(granules[0]) + + Display with custom rendering:: - >>> response = maap.publishAlgorithm('my_algorithm:v1.0') - >>> if response.ok: - ... print("Algorithm published successfully") + >>> maap.show(granule, display_config={ + ... 'rescale': '0,100', + ... 'color_map': 'viridis' + ... }) Notes ----- - Published algorithms are visible to all MAAP users and can be - executed by anyone with DPS access. + - Requires ``mapboxgl`` package and a Jupyter notebook environment + - Uses the MAAP tile server for rendering + - A Mapbox access token must be configured See Also -------- - :meth:`registerAlgorithm` : Register an algorithm - :meth:`deleteAlgorithm` : Delete an algorithm + :meth:`search_granule` : Search for granules to visualize """ - url = self.config.mas_algo.replace('algorithm', 'publish') - headers = self._get_api_header() - body = { "algo_id": algoid} - logger.debug('POST request sent to {}'.format(url)) - logger.debug('headers:') - logger.debug(headers) - logger.debug('body:') - logger.debug(body) - response = requests.post( - url=url, - headers=headers, - data=body + from mapboxgl.viz import RasterTilesViz + + granule_ur = granule['Granule']['GranuleUR'] + browse_file = json.loads(self._get_browse(granule_ur).text)['browse'] + capabilities = json.loads(self._get_capabilities(granule_ur).text)['body'] + presenter = Presenter(capabilities, display_config) + query_params = dict(url=browse_file, **presenter.display_config) + qs = urllib.parse.urlencode(query_params) + tiles_url = f"{self.config.tiler_endpoint}/tiles/{{z}}/{{x}}/{{y}}.png?{qs}" + viz = RasterTilesViz( + tiles_url, + height='800px', + zoom=10, + access_token=self.config.mapbox_token, + tiles_size=256, + tiles_bounds=presenter.bbox, + center=(presenter.lng, presenter.lat), + tiles_minzoom=presenter.minzoom, + tiles_maxzoom=presenter.maxzoom, ) - return response + viz.show() - def deleteAlgorithm(self, algoid): + # OGC-compliant endpoint functions + def listAlgorithms(self): """ - Delete a registered algorithm. + Search all OGC processes. - Removes an algorithm registration from the MAAP DPS. This does not - affect any completed jobs that used the algorithm. + DEPRECATION NOTICE: This method is no longer supported. Use the following instead: - Parameters - ---------- - algoid : str - The algorithm identifier to delete, typically in the format - ``algorithm_name:code_version``. - - Returns - ------- - requests.Response - HTTP response indicating success or failure of deletion. + list_algorithms() + """ + raise Exception("listAlgorithms() is no longer supported. Use list_algorithms() instead.") - Examples - -------- - Delete an algorithm:: + def list_algorithms(self): + """ + Search all OGC processes + :return: Response object with all deployed processes + """ + headers = self._get_api_header() + logger.debug('GET request sent to {}'.format(self.config.processes_ogc)) - >>> response = maap.deleteAlgorithm('my_algorithm:main') - >>> if response.ok: - ... print("Algorithm deleted") + response = requests.get( + url=self.config.processes_ogc, + headers=headers + ) + return response - Warnings - -------- - This action cannot be undone. The algorithm configuration will be - permanently removed. + def deploy_algorithm(self, execution_unit_href): + """ + Deploy a new OGC process + :param execution_unit_href: URL to the CWL file + :return: Response object with deployment information + """ + headers = self._get_api_header(content_type='application/json') + data = { + "executionUnit": { + "href": execution_unit_href + } + } + logger.debug('POST request sent to {}'.format(self.config.processes_ogc)) + response = requests.post( + url=self.config.processes_ogc, + headers=headers, + json=data + ) + return response - See Also - -------- - :meth:`registerAlgorithm` : Register an algorithm - :meth:`listAlgorithms` : List algorithms + def get_deployment_status(self, deployment_id): """ - url = os.path.join(self.config.mas_algo, algoid) + Query the current status of an algorithm being deployed + :param deployment_id: The deployment job ID + :return: Response object with deployment status + """ + url = os.path.join(self.config.deployment_jobs_ogc, str(deployment_id)) headers = self._get_api_header() - logger.debug('DELETE request sent to {}'.format(url)) - logger.debug('headers:') - logger.debug(headers) - response = requests.delete( + logger.debug('GET request sent to {}'.format(url)) + response = requests.get( url=url, headers=headers ) return response - - def getJob(self, jobid): + def describeAlgorithm(self): """ - Get a DPS job with all available attributes. + Get detailed information about a specific OGC process. - Retrieves a job object with its current status, results (if available), - and metrics (if available). + DEPRECATION NOTICE: This method is no longer supported. Use the following instead: - Parameters - ---------- - jobid : str - The unique job identifier (UUID). + describe_algorithm(process_id) - Returns - ------- - DPSJob - A :class:`~maap.dps.dps_job.DPSJob` object with populated attributes - including status, outputs, and metrics. - - Examples - -------- - Get a job and inspect its status:: - - >>> job = maap.getJob('f3780917-92c0-4440-8a84-9b28c2e64fa8') - >>> print(f"Status: {job.status}") - >>> print(f"Outputs: {job.outputs}") - >>> print(f"Duration: {job.job_duration_seconds} seconds") - - See Also - -------- - :meth:`getJobStatus` : Get status only - :meth:`getJobResult` : Get results only - :meth:`getJobMetrics` : Get metrics only - :meth:`submitJob` : Submit a new job + process_id: The process ID to describe """ - job = DPSJob(self.config) - job.id = jobid - job.retrieve_attributes() - return job + raise Exception("describeAlgorithm() is no longer supported. Use describe_algorithm(process_id) instead.") - def getJobStatus(self, jobid): + def describe_algorithm(self, process_id): + """ + Get detailed information about a specific OGC process + :param process_id: The process ID to describe + :return: Response object with process details """ - Get the current status of a DPS job. + url = os.path.join(self.config.processes_ogc, str(process_id)) + headers = self._get_api_header() + response = requests.get( + url=url, + headers=headers + ) + return response - Parameters - ---------- - jobid : str - The unique job identifier (UUID). + def update_algorithm(self, process_id, execution_unit_href): + """ + Replace an existing OGC process (must be the original deployer) + :param process_id: The process ID to update + :param execution_unit_href: URL to the new CWL file + :return: Response object with update information + """ + url = os.path.join(self.config.processes_ogc, str(process_id)) + headers = self._get_api_header(content_type='application/json') + data = { + "executionUnit": { + "href": execution_unit_href + } + } + logger.debug('PUT request sent to {}'.format(url)) + response = requests.put( + url=url, + headers=headers, + json=data + ) + return response - Returns - ------- - str - The job status. Possible values are: + def deleteAlgorithm(self): + """ + Delete an existing OGC process (must be the original deployer). - - ``'Accepted'``: Job is queued - - ``'Running'``: Job is executing - - ``'Succeeded'``: Job completed successfully - - ``'Failed'``: Job failed - - ``'Dismissed'``: Job was cancelled + DEPRECATION NOTICE: This method is no longer supported. Use the following instead: - Examples - -------- - Check job status:: + delete_algorithm(process_id) - >>> status = maap.getJobStatus('f3780917-92c0-4440-8a84-9b28c2e64fa8') - >>> print(f"Job status: {status}") + process_id: The process ID to delete + """ + raise Exception("deleteAlgorithm() is no longer supported. Use delete_algorithm(process_id) instead.") - See Also - -------- - :meth:`getJob` : Get full job object - :meth:`cancelJob` : Cancel a running job + def delete_algorithm(self, process_id): """ - job = DPSJob(self.config) - job.id = jobid - return job.retrieve_status() + Delete an existing OGC process (must be the original deployer) + :param process_id: The process ID to delete + :return: Response object with deletion confirmation + """ + url = os.path.join(self.config.processes_ogc, str(process_id)) + headers = self._get_api_header() + logger.debug('DELETE request sent to {}'.format(url)) + response = requests.delete( + url=url, + headers=headers + ) + return response - def getJobResult(self, jobid): + def get_algorithm_package(self, process_id): + """ + Access the formal description that can be used to deploy an OGC process + :param process_id: The process ID + :return: Response object with process package description """ - Get the output URLs from a completed DPS job. + url = os.path.join(self.config.processes_ogc, str(process_id), 'package') + headers = self._get_api_header() + logger.debug('GET request sent to {}'.format(url)) + response = requests.get( + url=url, + headers=headers + ) + return response + + def submitJob(self): + """ + Submit a job to the MAAP Data Processing System (DPS). - Parameters - ---------- - jobid : str - The unique job identifier (UUID). + DEPRECATION NOTICE: This method is no longer supported. Use the following instead: - Returns - ------- - list of str - List of URLs pointing to job output files. Typically includes - HTTP, S3, and console URLs for the output directory. + submit_job(process_id, inputs, queue) - Examples - -------- - Get job outputs:: + process_id: The process ID to execute + inputs: Dictionary of input parameters for the process + queue: Queue to run the job on + """ + raise Exception("submitJob() is no longer supported. Use submit_job(process_id, inputs, queue) instead.") - >>> outputs = maap.getJobResult('f3780917-92c0-4440-8a84-9b28c2e64fa8') - >>> for url in outputs: - ... print(url) - Notes - ----- - This method only returns results for jobs that have completed - (succeeded or failed). For running jobs, the output list will be empty. - See Also - -------- - :meth:`getJob` : Get full job object - :meth:`getJobMetrics` : Get job performance metrics + def submit_job(self, process_id, inputs, queue, dedup=None, tag=None): """ - job = DPSJob(self.config) - job.id = jobid - return job.retrieve_result() - - def getJobMetrics(self, jobid): + Execute an OGC process job + :param process_id: The process ID to execute + :param inputs: Dictionary of input parameters for the process + :param queue: Queue to run the job on + :param dedup: Optional deduplication flag + :param tag: Optional user-defined tag for the job + :return: Response object with job execution information """ - Get performance metrics from a completed DPS job. - - Retrieves resource usage and timing information for a job. - - Parameters - ---------- - jobid : str - The unique job identifier (UUID). + url = os.path.join(self.config.processes_ogc, str(process_id), 'execution') + headers = self._get_api_header(content_type='application/json') + data = { + "inputs": inputs, + "queue": queue + } + if dedup is not None: + data["dedup"] = dedup + if tag is not None: + data["tag"] = tag + + logger.debug('POST request sent to {}'.format(url)) - Returns - ------- - dict - Dictionary containing job metrics including: + response = requests.post( + url=url, + headers=headers, + json=data + ) + return response - - ``machine_type``: EC2 instance type used - - ``job_start_time``: ISO timestamp of job start - - ``job_end_time``: ISO timestamp of job end - - ``job_duration_seconds``: Total execution time - - ``cpu_usage``: CPU time in nanoseconds - - ``mem_usage``: Memory usage in bytes - - ``max_mem_usage``: Peak memory usage in bytes - - ``directory_size``: Output directory size in bytes + def getJobStatus(self): + """ + Get the status of an OGC job. - Examples - -------- - Get job metrics:: + DEPRECATION NOTICE: This method is no longer supported. Use the following instead: - >>> metrics = maap.getJobMetrics('f3780917-92c0-4440-8a84-9b28c2e64fa8') - >>> print(f"Duration: {metrics['job_duration_seconds']} seconds") - >>> print(f"Max memory: {metrics['max_mem_usage']} bytes") + get_job_status(job_id) - See Also - -------- - :meth:`getJob` : Get full job object - :meth:`getJobResult` : Get job outputs + job_id: The job ID to check status for """ - job = DPSJob(self.config) - job.id = jobid - return job.retrieve_metrics() + raise Exception("getJobStatus() is no longer supported. Use get_job_status(job_id) instead.") - def cancelJob(self, jobid): + def get_job_status(self, job_id): """ - Cancel a running or queued DPS job. - - Attempts to stop execution of a job that is currently running or - waiting in the queue. - - Parameters - ---------- - jobid : str - The unique job identifier (UUID) to cancel. - - Returns - ------- - str - Response from the DPS indicating the cancellation result. - - Examples - -------- - Cancel a job:: - - >>> result = maap.cancelJob('f3780917-92c0-4440-8a84-9b28c2e64fa8') - >>> print(result) + Get the status of an OGC job + :param job_id: The job ID to check status for + :return: Response object with job status + """ + url = os.path.join(self.config.jobs_ogc, str(job_id)) + headers = self._get_api_header() + logger.debug('GET request sent to {}'.format(url)) - Notes - ----- - Jobs that are already completed (Succeeded or Failed) cannot be - cancelled. The job status will be set to ``'Dismissed'`` upon - successful cancellation. + response = requests.get( + url=url, + headers=headers + ) + return response - See Also - -------- - :meth:`submitJob` : Submit a job - :meth:`getJobStatus` : Check job status - """ - job = DPSJob(self.config) - job.id = jobid - return job.cancel_job() - - def listJobs(self, *, - algo_id=None, - end_time=None, - get_job_details=True, - offset=0, - page_size=10, - queue=None, - start_time=None, - status=None, - tag=None, - version=None): + def cancelJob(self): """ - List jobs submitted by the current user. + Cancel a running OGC job or delete a queued job. - Retrieves a paginated list of DPS jobs matching the specified filter - criteria. + DEPRECATION NOTICE: This method is no longer supported. Use the following instead: - Parameters - ---------- - algo_id : str, optional - Filter by algorithm name. Must be provided together with ``version``. - end_time : str, optional - Filter for jobs completed before this time. Format: ISO 8601 - (e.g., ``'2024-01-01'`` or ``'2024-01-01T00:00:00.000000Z'``). - get_job_details : bool, optional - If ``True`` (default), return detailed job information. If ``False``, - return only job IDs and tags for faster response. - offset : int, optional - Number of jobs to skip for pagination. Default is 0. - page_size : int, optional - Number of jobs to return per page. Default is 10. - queue : str, optional - Filter by processing queue name. - start_time : str, optional - Filter for jobs started after this time. Format: ISO 8601. - status : str, optional - Filter by job status. Valid values: - - - ``'Accepted'``: Queued jobs - - ``'Running'``: Currently executing - - ``'Succeeded'``: Completed successfully - - ``'Failed'``: Completed with errors - - ``'Dismissed'``: Cancelled jobs - - tag : str, optional - Filter by user-defined job tag/identifier. - version : str, optional - Filter by algorithm version. Must be provided together with ``algo_id``. + cancel_job(job_id, wait_for_completion=False) - Returns - ------- - requests.Response - HTTP response containing JSON list of jobs matching the criteria. + job_id: The job ID to cancel + wait_for_completion: Whether to wait for the cancellation to complete + """ + raise Exception("cancelJob() is no longer supported. Use cancel_job(job_id, wait_for_completion=False) instead.") - Raises - ------ - ValueError - If only one of ``algo_id`` or ``version`` is provided. Both must - be provided together or neither should be provided. + def cancel_job(self, job_id, wait_for_completion=False): + """ + Cancel a running OGC job or delete a queued job + :param job_id: The job ID to cancel + :param wait_for_completion: Whether to wait for the cancellation to complete + :return: Response object with cancellation status + """ + url = os.path.join(self.config.jobs_ogc, str(job_id)) + params = {} + if wait_for_completion: + params['wait_for_completion'] = str(wait_for_completion).lower() + + headers = self._get_api_header() + logger.debug('DELETE request sent to {}'.format(url)) - Examples - -------- - List recent jobs:: + response = requests.delete( + url=url, + headers=headers, + params=params + ) + return response - >>> response = maap.listJobs(page_size=20) - >>> jobs = response.json() - >>> for job in jobs: - ... print(f"{job['job_id']}: {job['status']}") + def getJobResult(self): + """ + Get the results of a completed OGC job. - Filter by algorithm and version:: + DEPRECATION NOTICE: This method is no longer supported. Use the following instead: - >>> response = maap.listJobs( - ... algo_id='my_algorithm', - ... version='main', - ... status='Succeeded' - ... ) + get_job_result(job_id) - Paginate through results:: + job_id: The job ID to get results for + """ + raise Exception("getJobResult() is no longer supported. Use get_job_result(job_id) instead.") - >>> response = maap.listJobs(offset=0, page_size=10) - >>> # Get next page - >>> response = maap.listJobs(offset=10, page_size=10) + def get_job_result(self, job_id): + """ + Get the results of a completed OGC job + :param job_id: The job ID to get results for + :return: Response object with job results + """ + url = os.path.join(self.config.jobs_ogc, str(job_id), 'results') + headers = self._get_api_header() + logger.debug('GET request sent to {}'.format(url)) + response = requests.get( + url=url, + headers=headers + ) + return response - Filter by time range:: + def listJobs(self): + """ + Returns a list of jobs for a given user that matches query params provided. - >>> response = maap.listJobs( - ... start_time='2024-01-01', - ... end_time='2024-01-31' - ... ) + DEPRECATION NOTICE: This method is no longer supported. Use the following instead: - See Also - -------- - :meth:`getJob` : Get details of a specific job - :meth:`submitJob` : Submit a new job - """ - url = "/".join( - segment.strip("/") - for segment in (self.config.dps_job, endpoints.DPS_JOB_LIST) - ) + list_jobs(*, process_id=None, limit=None, get_job_details=True, offset=0, page_size=10, queue=None, status=None, tag=None, min_duration=None, max_duration=None, type=None, datetime=None, priority=None) + process_id (id, optional): Algorithm ID to only show jobs submitted for this algorithm + limit (int, optional): Limit of jobs to send back + get_job_details (bool, optional): Flag that determines whether to return a detailed job list or a compact list containing just the job ids and their associated job tags. Default is True. + offset (int, optional): Offset for pagination. Default is 0. + page_size (int, optional): Page size for pagination. Default is 10. + queue (str, optional): Job processing resource. + status (str, optional): Job status, e.g. job-completed, job-failed, job-started, job-queued. + tag (str, optional): User job tag/identifier. + min_duration (int, optional): Minimum duration in seconds + max_duration (int, optional): Maximum duration in seconds + type (str, optional): Type, available values: process + datetime (str, optional): Either a date-time or an interval, half-bounded or bounded. Date and time expressions adhere to RFC 3339. Half-bounded intervals are expressed using double-dots. + priority (int, optional): Job priority, 0-9 + """ + raise Exception("listJobs() is no longer supported. Use list_jobs() with appropriate keyword arguments instead.") + + def list_jobs(self, *, + process_id=None, + limit=None, + get_job_details=True, + offset=0, + page_size=10, + queue=None, + status=None, + tag=None, + min_duration=None, + max_duration=None, + type=None, + datetime=None, + priority=None): + """ + Returns a list of jobs for a given user that matches query params provided. + + Args: + process_id (id, optional): Algorithm ID to only show jobs submitted for this algorithm + limit (int, optional): Limit of jobs to send back + get_job_details (bool, optional): Flag that determines whether to return a detailed job list or a compact list containing just the job ids and their associated job tags. Default is True. + offset (int, optional): Offset for pagination. Default is 0. + page_size (int, optional): Page size for pagination. Default is 10. + queue (str, optional): Job processing resource. + status (str, optional): Job status, e.g. job-completed, job-failed, job-started, job-queued. + tag (str, optional): User job tag/identifier. + min_duration (int, optional): Minimum duration in seconds + max_duration (int, optional): Maximum duration in seconds + type (str, optional): Type, available values: process + datetime (str, optional): Either a date-time or an interval, half-bounded or bounded. Date and time expressions adhere to RFC 3339. Half-bounded intervals are expressed using double-dots. + priority (int, optional): Job priority, 0-9 + + Returns: + list: List of jobs for a given user that matches query params provided. + + Raises: + ValueError: If either algo_id or version is provided, but not both. + """ params = { k: v for k, v in ( - ("algo_id", algo_id), - ("end_time", end_time), - ("get_job_details", get_job_details), + ("processID", process_id), + ("limit", limit), + ("getJobDetails", get_job_details), ("offset", offset), - ("page_size", page_size), + ("pageSize", page_size), ("queue", queue), - ("start_time", start_time), ("status", status), ("tag", tag), - ("version", version), + ("minDuration", min_duration), + ("maxDuration", max_duration), + ("type", type), + ("datetime", datetime), + ("priority", priority), ) if v is not None } - - if (not algo_id) != (not version): - # Either algo_id or version was supplied as a non-empty string, but not both. - # Either both must be non-empty strings or both must be None. - raise ValueError("Either supply non-empty strings for both algo_id and version, or supply neither.") - - # DPS requests use 'job_type', which is a concatenation of 'algo_id' and 'version' - if algo_id and version: - params['job_type'] = f"{algo_id}:{version}" - - algo_id = params.pop('algo_id', None) - version = params.pop('version', None) - - if status is not None: - params['status'] = job.validate_job_status(status) + url = os.path.join(self.config.jobs_ogc) headers = self._get_api_header() + logger.debug('GET request sent to {}'.format(url)) - logger.debug('headers:') - logger.debug(headers) response = requests.get( url=url, headers=headers, - params=params, + params=params ) return response - def submitJob(self, identifier, algo_id, version, queue, retrieve_attributes=False, **kwargs): - """ - Submit a job to the MAAP Data Processing System (DPS). - - Submits an algorithm for execution on the DPS infrastructure with the - specified parameters and compute resources. - - Parameters - ---------- - identifier : str - A user-defined tag or identifier for the job. Used for tracking - and organizing jobs. - algo_id : str - The algorithm name to execute. - version : str - The algorithm version (e.g., Git branch or tag). - queue : str - The compute queue/resource to use (e.g., ``'maap-dps-worker-8gb'``). - Use :meth:`getQueues` to list available queues. - retrieve_attributes : bool, optional - If ``True``, immediately retrieve job status after submission. - Default is ``False``. - **kwargs : dict - Algorithm input parameters. Parameter names must match those - defined in the algorithm registration. - - Returns - ------- - DPSJob - A :class:`~maap.dps.dps_job.DPSJob` object representing the - submitted job. Use the job's methods to monitor status and - retrieve results. - - Examples - -------- - Submit a basic job:: - - >>> job = maap.submitJob( - ... identifier='my_analysis_run', - ... algo_id='my_algorithm', - ... version='main', - ... queue='maap-dps-worker-8gb', - ... input_file='s3://bucket/input.tif' - ... ) - >>> print(f"Job ID: {job.id}") - - Submit with multiple parameters:: - - >>> job = maap.submitJob( - ... identifier='batch_processing', - ... algo_id='processor', - ... version='v2.0', - ... queue='maap-dps-worker-32gb', - ... input_granule='s3://bucket/data.h5', - ... output_format='geotiff', - ... resolution=30 - ... ) - - Submit and immediately get status:: - - >>> job = maap.submitJob( - ... identifier='urgent_job', - ... algo_id='my_algorithm', - ... version='main', - ... queue='maap-dps-worker-8gb', - ... retrieve_attributes=True - ... ) - >>> print(f"Status: {job.status}") - - Monitor job completion:: - - >>> job = maap.submitJob(...) - >>> job.wait_for_completion() - >>> print(f"Final status: {job.status}") - >>> print(f"Outputs: {job.outputs}") - - Notes - ----- - - The job executes asynchronously; this method returns immediately - after submission. - - Use :meth:`~maap.dps.dps_job.DPSJob.wait_for_completion` to block - until the job finishes. - - Input parameters with ``download=True`` in the algorithm config - will be downloaded to the job's working directory. - - See Also - -------- - :meth:`getJob` : Retrieve job information - :meth:`listJobs` : List submitted jobs - :meth:`cancelJob` : Cancel a running job - :meth:`getQueues` : List available queues - :class:`~maap.dps.dps_job.DPSJob` : Job management class - """ - # Note that this is temporary and will be removed when we remove the API not requiring username to submit a job - # Also this now overrides passing someone else's username into submitJob since we don't want to allow that - if self.profile is not None and self.profile.account_info() is not None and 'username' in self.profile.account_info().keys(): - kwargs['username'] = self.profile.account_info()['username'] - response = self._DPS.submit_job(request_url=self.config.dps_job, - identifier=identifier, algo_id=algo_id, version=version, queue=queue, **kwargs) - job = DPSJob(self.config) - job.set_submitted_job_result(response) - try: - if retrieve_attributes: - job.retrieve_attributes() - except: - logger.debug(f"Unable to retrieve attributes for job: {job}") - return job - - def uploadFiles(self, filenames): - """ - Upload files to MAAP shared storage. - - Uploads local files to an S3 staging directory where they can be - accessed by other MAAP users or used as inputs to DPS jobs. - - Parameters - ---------- - filenames : list of str - List of local file paths to upload. - - Returns - ------- - str - A message containing the UUID of the upload directory. This UUID - is needed to share the files with other users. - - Examples - -------- - Upload files to share:: - - >>> result = maap.uploadFiles(['data.csv', 'config.json']) - >>> print(result) - Upload file subdirectory: a1b2c3d4-e5f6-... (keep a record of...) - - Upload a single file:: - - >>> result = maap.uploadFiles(['output.tif']) - - Notes - ----- - - Files are uploaded to a unique subdirectory identified by a UUID - - Save the UUID to share the upload location with collaborators - - The upload location can be used as input to DPS jobs - - See Also - -------- - :meth:`submitJob` : Use uploaded files as job inputs - """ - bucket = self.config.s3_user_upload_bucket - prefix = self.config.s3_user_upload_dir - uuid_dir = uuid.uuid4() - for filename in filenames: - basename = os.path.basename(filename) - response = self._upload_s3(filename, bucket, f"{prefix}/{uuid_dir}/{basename}") - return f"Upload file subdirectory: {uuid_dir} (keep a record of this if you want to share these files with other users)" - - def _get_browse(self, granule_ur): + def getJobMetrics(self): """ - Get browse image metadata for a granule. + Get metrics for an OGC job. - Internal method to retrieve browse image information for visualization. + DEPRECATION NOTICE: This method is no longer supported. Use the following instead: - Parameters - ---------- - granule_ur : str - The Granule Universal Reference identifier. + get_job_metrics(job_id) - Returns - ------- - requests.Response - HTTP response containing browse image metadata. + job_id: The job ID to get metrics for """ - response = requests.get( - url=f'{self.config.wmts}/GetTile', - params=dict(granule_ur=granule_ur), - headers=dict(Accept='application/json') - ) - return response + raise Exception("getJobMetrics() is no longer supported. Use get_job_metrics(job_id) instead.") - def _get_capabilities(self, granule_ur): + def get_job_metrics(self, job_id): """ - Get WMTS capabilities for a granule. - - Internal method to retrieve Web Map Tile Service capabilities - for visualization. - - Parameters - ---------- - granule_ur : str - The Granule Universal Reference identifier. - - Returns - ------- - requests.Response - HTTP response containing WMTS capabilities XML. + Get metrics for an OGC job + :param job_id: The job ID to get metrics for + :return: Response object with job metrics """ + url = os.path.join(self.config.jobs_ogc, str(job_id), 'metrics') + headers = self._get_api_header() + logger.debug('GET request sent to {}'.format(url)) response = requests.get( - url=f'{self.config.wmts}/GetCapabilities', - params=dict(granule_ur=granule_ur), - headers=dict(Accept='application/json') + url=url, + headers=headers ) return response - def show(self, granule, display_config={}): - """ - Display a granule on an interactive map. - - Renders the granule data as a tile layer on an interactive Mapbox - map in a Jupyter notebook environment. - - Parameters - ---------- - granule : dict - A granule result dictionary, typically obtained from - :meth:`searchGranule`. Must contain ``Granule.GranuleUR``. - display_config : dict, optional - Configuration options for rendering. Common options include: - - rescale : str - Value range for color scaling (e.g., ``'0,70'``). - color_map : str - Color palette name (e.g., ``'schwarzwald'``). - - Examples - -------- - Display a granule on a map:: - - >>> granules = maap.searchGranule(short_name='AFLVIS2', limit=1) - >>> maap.show(granules[0]) - - Display with custom rendering:: - - >>> maap.show(granule, display_config={ - ... 'rescale': '0,100', - ... 'color_map': 'viridis' - ... }) - - Notes - ----- - - Requires ``mapboxgl`` package and a Jupyter notebook environment - - Uses the MAAP tile server for rendering - - A Mapbox access token must be configured - - See Also - -------- - :meth:`searchGranule` : Search for granules to visualize - """ - from mapboxgl.viz import RasterTilesViz - - granule_ur = granule['Granule']['GranuleUR'] - browse_file = json.loads(self._get_browse(granule_ur).text)['browse'] - capabilities = json.loads(self._get_capabilities(granule_ur).text)['body'] - presenter = Presenter(capabilities, display_config) - query_params = dict(url=browse_file, **presenter.display_config) - qs = urllib.parse.urlencode(query_params) - tiles_url = f"{self.config.tiler_endpoint}/tiles/{{z}}/{{x}}/{{y}}.png?{qs}" - viz = RasterTilesViz( - tiles_url, - height='800px', - zoom=10, - access_token=self.config.mapbox_token, - tiles_size=256, - tiles_bounds=presenter.bbox, - center=(presenter.lng, presenter.lat), - tiles_minzoom=presenter.minzoom, - tiles_maxzoom=presenter.maxzoom, - ) - viz.show() - if __name__ == "__main__": print("initialized") \ No newline at end of file diff --git a/maap/utils/CMR.py b/maap/utils/CMR.py index 18ebf94..665228f 100644 --- a/maap/utils/CMR.py +++ b/maap/utils/CMR.py @@ -129,7 +129,7 @@ def generateGranuleCallFromEarthDataRequest(self, query, variable_name='maap', l params.append("limit=" + str(limit)) - result = variable_name + ".searchGranule(" + ", ".join(params) + ")" + result = variable_name + ".search_granule(" + ", ".join(params) + ")" return result @@ -157,7 +157,7 @@ def generateCallFromEarthDataQueryString(self, search_url, variable_name='maap', # e.g., # granules?collection_concept_id[]=C1&collection_concept_id[]=C2 # will be converted to - # maap.searchGranule(collection_concept_id="C1|C2") + # maap.search_granule(collection_concept_id="C1|C2") if any(x for x in params if x.startswith(p_key_assignment)): params[i - 1] = params[i - 1].replace(p_key_assignment, p_key_assignment + p_val + "|") else: @@ -167,8 +167,8 @@ def generateCallFromEarthDataQueryString(self, search_url, variable_name='maap', params.append("limit=" + str(limit)) if search == 'granule': - result = variable_name + ".searchGranule(" + ", ".join(params) + ")" + result = variable_name + ".search_granule(" + ", ".join(params) + ")" else: - result = variable_name + ".searchCollection(" + ", ".join(params) + ")" + result = variable_name + ".search_collection(" + ", ".join(params) + ")" return result diff --git a/maap/utils/algorithm_utils.py b/maap/utils/algorithm_utils.py index db12dd6..8c652ee 100644 --- a/maap/utils/algorithm_utils.py +++ b/maap/utils/algorithm_utils.py @@ -8,7 +8,6 @@ except ImportError: from yaml import Loader, Dumper - logger = logging.getLogger(__name__) @@ -18,6 +17,5 @@ def read_yaml_file(algo_yaml): algo_config = yaml_load(fr, Loader=Loader) return validate_algorithm_config(algo_config) - def validate_algorithm_config(algo_config): return algo_config diff --git a/pyproject.toml b/pyproject.toml index 701a5c3..9ab29f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "maap-py" -version = "4.3.0a2" +version = "5.1.0a2" description = "Python client API for interacting with the NASA MAAP API" repository = "https://github.com/MAAP-Project/maap-py" authors = ["Jet Propulsion Laboratory "] diff --git a/test/functional_test.py b/test/functional_test.py index de55154..d2ef8b7 100644 --- a/test/functional_test.py +++ b/test/functional_test.py @@ -79,7 +79,7 @@ def submit_job(maap: MAAP, wait_for_completion=False, queue="maap-dps-worker-8gb "output_filename": "output.tif", "outsize": "20" } - job = maap.submitJob(identifier="maap_functional_test", + job = maap.submit_job(identifier="maap_functional_test", algo_id=algo_name, version=algo_version, queue=queue, diff --git a/test/test_CMR.py b/test/test_CMR.py index 3a9ad33..d8878e7 100644 --- a/test/test_CMR.py +++ b/test/test_CMR.py @@ -16,79 +16,79 @@ def setUpClass(cls): cls._test_ur = 'uavsar_AfriSAR_v1-cov_lopenp_14043_16008_140_001_160225-geo_cov_4-4.bin' cls._test_site_name = 'lope' - def test_searchGranuleByInstrumentAndTrackNumber(self): - results = self.maap.searchGranule( + def test_search_granule_by_instrument_and_track_number(self): + results = self.maap.search_granule( instrument=self._test_instrument_name_uavsar, track_number=self._test_track_number, polarization='HH') self.assertTrue('concept-id' in results[0].keys()) - def test_searchGranuleByGranuleUR(self): - results = self.maap.searchGranule( + def test_search_granule_by_granule_ur(self): + results = self.maap.search_granule( granule_ur=self._test_ur) self.assertTrue('concept-id' in results[0].keys()) - def test_granuleDownload(self): - results = self.maap.searchGranule( + def test_granule_download(self): + results = self.maap.search_granule( granule_ur=self._test_ur) - download = results[0].getLocalPath('/Users/satorius/source') + download = results[0].get_local_path('/Users/satorius/source') self.assertTrue(len(download) > 0) - def test_granuleDownloadExternalDAAC(self): - # results = self.maap.searchGranule( + def test_granule_download_external_daac(self): + # results = self.maap.search_granule( # collection_concept_id='C1200231010-NASA_MAAP') - results = self.maap.searchGranule( + results = self.maap.search_granule( cmr_host='cmr.earthdata.nasa.gov', collection_concept_id='C2067521974-ORNL_CLOUD', granule_ur='GEDI_L3_Land_Surface_Metrics.GEDI03_elev_lowestmode_stddev_2019108_2020106_001_08.tif') - download = results[0].getData() + download = results[0].get_data() self.assertTrue(len(download) > 0) - def test_direct_granuleDownload(self): - results = self.maap.downloadGranule( + def test_direct_granule_download(self): + results = self.maap.download_granule( online_access_url='https://datapool.asf.alaska.edu/GRD_HD/SA/S1A_S3_GRDH_1SDH_20140615T034444_20140615T034512_001055_00107C_8977.zip', destination_path='./tmp' ) self.assertTrue(len(results) > 0) - def test_searchGranuleByInstrumentAndSiteName(self): - results = self.maap.searchGranule( + def test_search_granule_by_instrument_and_site_name(self): + results = self.maap.search_granule( instrument=self._test_instrument_name_lvis, site_name=self._test_site_name) self.assertTrue('concept-id' in results[0].keys()) - def test_searchGranuleWithPipeDelimiters(self): - results = self.maap.searchGranule( + def test_search_granule_with_pipe_delimiters(self): + results = self.maap.search_granule( instrument="LVIS|UAVSAR", platform="AIRCRAFT") self.assertTrue('concept-id' in results[0].keys()) - def test_searchFromEarthdata(self): - results = self.maap.searchCollection( + def test_search_from_earthdata(self): + results = self.maap.search_collection( instrument="LVIS|UAVSAR", platform="AIRCRAFT|B-200|COMPUTERS", data_center="MAAP Data Management Team|ORNL_DAAC") self.assertTrue('concept-id' in results[0].keys()) - def test_searchCollection(self): - results = self.maap.searchCollection( + def test_search_collection(self): + results = self.maap.search_collection( instrument=self._test_instrument_name_uavsar) self.assertTrue('concept-id' in results[0].keys()) - def test_searchGranuleWithWildcards(self): - results = self.maap.searchGranule(collection_concept_id="C1200110748-NASA_MAAP", + def test_search_granule_with_wildcards(self): + results = self.maap.search_granule(collection_concept_id="C1200110748-NASA_MAAP", readable_granule_name='*185*') self.assertTrue('concept-id' in results[0].keys()) - def test_getUrl(self): - results = self.maap.searchGranule(page_num="1", concept_id="C1214470488-ASF", sort_key="-start_date", limit=1) + def test_get_url(self): + results = self.maap.search_granule(page_num="1", concept_id="C1214470488-ASF", sort_key="-start_date", limit=1) - url = results[0].getHttpUrl() + url = results[0].get_http_url() self.assertTrue(url.startswith("http")) - url = results[0].getS3Url() + url = results[0].get_s3_url() self.assertTrue(url.startswith("s3")) diff --git a/test/test_DPS.py b/test/test_DPS.py deleted file mode 100644 index cb6dc47..0000000 --- a/test/test_DPS.py +++ /dev/null @@ -1,61 +0,0 @@ -from unittest import TestCase - -import yaml - -from maap.maap import MAAP -import logging -from yaml import load as yaml_load, dump as yaml_dump -try: - from yaml import CLoader as Loader, CDumper as Dumper -except ImportError: - from yaml import Loader, Dumper - -class TestDPS(TestCase): - logging.basicConfig(level=logging.DEBUG) - logger = logging.getLogger(__name__) - - @classmethod - def setUpClass(cls): - cls.logger.debug("Initializing MAAP") - cls.maap = MAAP() - - def test_registerAlgorithm(self): - self.maap.register_algorithm_from_yaml_file("dps_test_algo_config.yaml") - - def test_deleteAlgorithm(self): - pass - - def test_deleteJob(self): - pass - - def test_describeAlgorithm(self): - pass - - def test_dismissJob(self): - pass - - def test_getJobMetrics(self): - pass - - def test_getJobResult(self): - pass - - def test_getJobStatus(self): - pass - - def test_getQueues(self): - pass - - def test_listAlgorithms(self): - pass - - def test_listJobs(self): - pass - - def test_publishAlgorithm(self): - pass - - - def test_submitJob(self): - pass - diff --git a/test/test_MAAP.py b/test/test_MAAP.py index 5dcc082..aef8238 100644 --- a/test/test_MAAP.py +++ b/test/test_MAAP.py @@ -54,18 +54,18 @@ def test_genFromEarthdata(self): """ var_name = 'maapVar' - testResult = self.maap.getCallFromEarthdataQuery(query=input, variable_name=var_name) + testResult = self.maap.get_call_from_earthdata_query(query=input, variable_name=var_name) self.assertTrue( - testResult == var_name + '.searchGranule('\ + testResult == var_name + '.search_granule('\ 'processing_level_id="1A|1B|2|4", '\ 'instrument="LVIS|UAVSAR", '\ 'platform="AIRCRAFT|B-200|COMPUTERS", '\ 'data_center="MAAP Data Management Team", '\ 'bounding_box="-35.4375,-55.6875,-80.4375,37.6875")') - def test_uploadFiles(self): + def test_upload_files(self): self.maap._upload_s3 = MagicMock(return_value=None) - result = self.maap.uploadFiles(['test/s3-upload-testfile1.txt', 'test/s3-upload-testfile2.txt']) + result = self.maap.upload_files(['test/s3-upload-testfile1.txt', 'test/s3-upload-testfile2.txt']) upload_msg_regex = re.compile('Upload file subdirectory: .+ \\(keep a record of this if you want to share these files with other users\\)') self.assertTrue(re.match(upload_msg_regex, result)) diff --git a/test/test_ogc.py b/test/test_ogc.py new file mode 100644 index 0000000..84d3252 --- /dev/null +++ b/test/test_ogc.py @@ -0,0 +1,472 @@ +""" +Test module for algorithm and job functions in maap.py +""" + +import pytest +from maap.maap import MAAP + + +def test_list_algorithms(): + """Test list_algorithms function calls OGC algorithms endpoint and returns 200 with JSON""" + maap = MAAP(maap_host='api.dit.maap-project.org') + + response = maap.list_algorithms() + + # Check that we get a 200 status code + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + + # Check that response is valid JSON + try: + json_data = response.json() + assert isinstance(json_data, (dict, list)), "Response should be valid JSON (dict or list)" + except ValueError as e: + pytest.fail(f"Response is not valid JSON: {e}") + + +def test_register_algorithm(): + """Test register_algorithm function with a valid CWL URL""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + # Test that list_algorithms works first to ensure we have proper authentication + list_response = maap.list_algorithms() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping register_algorithm test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping register_algorithm test") + + # Use a real CWL example URL that should work + sample_cwl_url = "https://raw.githubusercontent.com/MAAP-Project/maap-algorithms/master/examples/hello-world/hello-world.cwl" + + response = maap.register_algorithm(sample_cwl_url) + + # Should get a successful response or a meaningful error + assert response.status_code in [200, 201], f"Expected successful registration, got {response.status_code}: {response.text}" + + # Should return JSON with deployment info + json_data = response.json() + assert isinstance(json_data, dict), "Registration response should be a JSON object" + + # Should contain deployment information + assert "deploymentID" in json_data or "id" in json_data, "Response should contain deployment ID" + + +def test_get_deployment_status(): + """Test get_deployment_status function""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + list_response = maap.list_algorithms() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping get_deployment_status test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping get_deployment_status test") + + # Since we don't have a real deployment ID, this test will likely return 404 + # which is the expected behavior for a non-existent deployment + sample_deployment_id = "test-deployment-123" + + response = maap.get_deployment_status(sample_deployment_id) + + # Should get a valid response - 200 if found, 404 if not found + assert response.status_code in [200, 404], f"Expected 200 or 404, got {response.status_code}: {response.text}" + + # If deployment exists (200), should return JSON with status info + if response.status_code == 200: + json_data = response.json() + assert isinstance(json_data, dict), "Status response should be a JSON object" + assert "status" in json_data, "Response should contain status information" + + # Verify the URL contains the deployment ID + assert str(sample_deployment_id) in response.url + + +def test_describe_algorithm(): + """Test describe_algorithm function by getting algorithm list and describing first algorithm""" + maap = MAAP(maap_host='api.dit.maap-project.org') + + # First get the list of algorithms + list_response = maap.list_algorithms() + assert list_response.status_code == 200, f"Failed to get algorithm list: {list_response.status_code}" + + try: + processes_data = list_response.json() + except ValueError as e: + pytest.fail(f"Algorithm list response is not valid JSON: {e}") + + # Check if there are any algorithms + if not processes_data or (isinstance(processes_data, dict) and not processes_data.get('processes')): + pytest.skip("No algorithms available to test describe_algorithm") + + # Get the first algorithm + if isinstance(processes_data, dict) and 'processes' in processes_data: + processes = processes_data['processes'] + else: + processes = processes_data + + if not processes or len(processes) == 0: + pytest.skip("No algorithms available to test describe_algorithm") + + first_process = processes[0] + + # Find the self link or use process ID + process_id = None + if 'links' in first_process: + for link in first_process['links']: + if link.get('rel') == 'self': + href = link.get('href', '') + # Extract process ID from href like /ogc/processes/3 + if '/ogc/processes/' in href: + process_id = href.split('/ogc/processes/')[-1] + break + + # Fall back to process ID field if no self link found + if not process_id and 'id' in first_process: + process_id = first_process['id'] + + if not process_id: + pytest.skip("Could not determine algorithm ID to test describe_algorithm") + + # Now test the describe_algorithm function + describe_response = maap.describe_algorithm(process_id) + + # Check that we get a successful response + assert describe_response.status_code == 200, f"Expected 200, got {describe_response.status_code}" + + # Check that response is valid JSON + try: + describe_data = describe_response.json() + assert isinstance(describe_data, dict), "Describe response should be a JSON object" + except ValueError as e: + pytest.fail(f"Describe response is not valid JSON: {e}") + + # Verify the URL called contains the algorithm ID + assert str(process_id) in describe_response.url + + +def test_update_algorithm(): + """Test update_algorithm function""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + list_response = maap.list_algorithms() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping update_algorithm test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping update_algorithm test") + + # Use a non-existent algorithm ID - should return 404 which is expected + sample_process_id = "non-existent-algorithm-123" + sample_cwl_url = "https://raw.githubusercontent.com/MAAP-Project/maap-algorithms/master/examples/hello-world/hello-world.cwl" + + response = maap.update_algorithm(sample_process_id, sample_cwl_url) + + # Should get a valid response - 200 if successful, 404 if not found, 403 if not authorized + assert response.status_code in [200, 404, 403], f"Expected 200, 404, or 403, got {response.status_code}: {response.text}" + + # If successful (200), should return JSON with update info + if response.status_code == 200: + json_data = response.json() + assert isinstance(json_data, dict), "Update response should be a JSON object" + + # Verify the URL contains the process ID + assert str(sample_process_id) in response.url + + +def test_delete_algorithm(): + """Test delete_algorithm function""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + list_response = maap.list_algorithms() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping delete_algorithm test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping delete_algorithm test") + + # Use a non-existent algorithm ID - should return 404 which is expected + sample_process_id = "non-existent-algorithm-123" + + response = maap.delete_algorithm(sample_process_id) + + # Should get a valid response - 200/204 if successful, 404 if not found, 403 if not authorized + assert response.status_code in [200, 204, 404, 403], f"Expected 200, 204, 404, or 403, got {response.status_code}: {response.text}" + + # If successful (200/204), response might be empty or contain JSON + if response.status_code in [200, 204]: + if response.content: # Only check JSON if there's content + json_data = response.json() + assert isinstance(json_data, dict), "Delete response should be a JSON object" + + # Verify the URL contains the process ID + assert str(sample_process_id) in response.url + + +def test_get_algorithm_package(): + """Test get_algorithm_package function""" + maap = MAAP(maap_host='api.dit.maap-project.org') + + # First get the list of algorithms + list_response = maap.list_algorithms() + assert list_response.status_code == 200, f"Failed to get algorithm list: {list_response.status_code}" + + try: + processes_data = list_response.json() + except ValueError as e: + pytest.fail(f"Algorithm list response is not valid JSON: {e}") + + # Check if there are any algorithms + if not processes_data or (isinstance(processes_data, dict) and not processes_data.get('processes')): + pytest.skip("No algorithms available to test get_algorithm_package") + + # Get the first algorithm + if isinstance(processes_data, dict) and 'processes' in processes_data: + processes = processes_data['processes'] + else: + processes = processes_data + + if not processes or len(processes) == 0: + pytest.skip("No algorithms available to test get_algorithm_package") + + first_process = processes[0] + + # Find the self link or use process ID + process_id = None + if 'links' in first_process: + for link in first_process['links']: + if link.get('rel') == 'self': + href = link.get('href', '') + # Extract process ID from href like /ogc/processes/3 + if '/ogc/processes/' in href: + process_id = href.split('/ogc/processes/')[-1] + break + + # Fall back to process ID field if no self link found + if not process_id and 'id' in first_process: + process_id = first_process['id'] + + if not process_id: + pytest.skip("Could not determine algorithm ID to test get_algorithm_package") + + # Now test the package_response function + package_response = maap.get_algorithm_package(process_id) + + # Check that we get a successful response + assert package_response.status_code == 200, f"Expected 200, got {package_response.status_code}" + + # Check that response is valid JSON + try: + package_data = package_response.json() + assert isinstance(package_data, dict), "Algorithm Package response should be a JSON object" + except ValueError as e: + pytest.fail(f"Algorithm package response is not valid JSON: {e}") + + # Verify the URL called contains the algorithm ID + assert str(process_id) in package_response.url + + +def test_submit_job(): + """Test submit_job function by first getting a real algorithm ID""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + list_response = maap.list_algorithms() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping submit_job test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping submit_job test") + + # Get a real algorithm to test with + try: + algorithms_data = list_response.json() + if not algorithms_data or (isinstance(algorithms_data, dict) and not algorithms_data.get('processes')): + pytest.skip("No algorithms available to test submit_job") + + if isinstance(algorithms_data, dict) and 'processes' in algorithms_data: + algorithms = algorithms_data['processes'] + else: + algorithms = algorithms_data + + if not algorithms or len(algorithms) == 0: + pytest.skip("No algorithms available to test submit_job") + + # Get the first algorithm's ID + first_algorithm = algorithms[0] + algorithm_id = first_algorithm.get('id') or first_algorithm.get('processId') + + if not algorithm_id: + pytest.skip("Could not determine algorithm ID to test submit_job") + + except Exception as e: + pytest.skip(f"Could not parse algorithms list: {e}") + + # Test job submission with minimal inputs + sample_inputs = {} # Empty inputs for basic test + sample_queue = "maap-dps-worker-32gb" # Use a real queue name + + response = maap.submit_job(algorithm_id, sample_inputs, sample_queue) + + # Should get a response - 200/201 if successful, 400 if invalid inputs, 404 if algorithm not found + assert response.status_code in [200, 201, 400, 404], f"Expected valid response, got {response.status_code}: {response.text}" + + # If successful (200/201), should return JSON with job info + if response.status_code in [200, 201]: + json_data = response.json() + assert isinstance(json_data, dict), "Job submission response should be a JSON object" + assert "jobID" in json_data or "id" in json_data, "Response should contain job ID" + + # Verify the URL contains the algorithm ID + assert str(algorithm_id) in response.url + + +def test_get_job_status(): + """Test get_job_status function""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + list_response = maap.list_jobs() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping get_job_status test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping get_job_status test") + + # Use a non-existent job ID - should return 404 which is expected + sample_job_id = "non-existent-job-123" + + response = maap.get_job_status(sample_job_id) + + # Should get a valid response - 200 if found, 404 if not found + assert response.status_code in [200, 404], f"Expected 200 or 404, got {response.status_code}: {response.text}" + + # If job exists (200), should return JSON with status info + if response.status_code == 200: + json_data = response.json() + assert isinstance(json_data, dict), "Job status response should be a JSON object" + assert "status" in json_data, "Response should contain status information" + + # Verify the URL contains the job ID + assert str(sample_job_id) in response.url + + +def test_cancel_job(): + """Test cancel_job function""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + list_response = maap.list_jobs() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping cancel_job test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping cancel_job test") + + # Use a non-existent job ID - should return 404 which is expected + sample_job_id = "non-existent-job-123" + + response = maap.cancel_job(sample_job_id) + + # Should get a valid response - 200/204 if successful, 404 if not found, 409 if already completed + assert response.status_code in [200, 204, 404, 409], f"Expected 200, 204, 404, or 409, got {response.status_code}: {response.text}" + + # If successful (200/204), response might be empty or contain JSON + if response.status_code in [200, 204]: + if response.content: # Only check JSON if there's content + json_data = response.json() + assert isinstance(json_data, dict), "Cancel response should be a JSON object" + + # Verify the URL contains the job ID + assert str(sample_job_id) in response.url + + +def test_get_job_result(): + """Test get_job_result function""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + list_response = maap.list_jobs() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping get_job_result test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping get_job_result test") + + # Use a non-existent job ID - should return 404 which is expected + sample_job_id = "non-existent-job-123" + + response = maap.get_job_result(sample_job_id) + + # Should get a valid response - 200 if found, 404 if not found, 425 if not ready + assert response.status_code in [200, 404, 425], f"Expected 200, 404, or 425, got {response.status_code}: {response.text}" + + # If job results exist (200), should return JSON with result info + if response.status_code == 200: + json_data = response.json() + assert isinstance(json_data, dict), "Job result response should be a JSON object" + # Should contain outputs or links to result files + + # Verify the URL contains the job ID and 'results' + assert str(sample_job_id) in response.url + assert 'results' in response.url + + +def test_list_jobs(): + """Test list_jobs function""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + response = maap.list_jobs() + if response.status_code != 200: + pytest.skip("Authentication required - skipping get_job_result test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping get_job_result test") + + # Only check JSON content if we get a successful response + if response.status_code == 200: + json_data = response.json() + assert isinstance(json_data, (dict, list)), "Jobs list response should be JSON (dict or list)" + + # If it's a dict, it might have a 'jobs' key or similar + if isinstance(json_data, dict): + # Common structures: {"jobs": [...]} or {"processes": [...]} + assert len(json_data) >= 0, "Jobs response should be valid" + elif isinstance(json_data, list): + # Direct list of jobs + assert len(json_data) >= 0, "Jobs list should be valid" + + +def test_get_job_metrics(): + """Test get_job_metrics function""" + maap = MAAP() + + # Skip test if we can't authenticate + try: + list_response = maap.list_jobs() + if list_response.status_code != 200: + pytest.skip("Authentication required - skipping get_job_metrics test") + except Exception: + pytest.skip("Cannot connect to MAAP API - skipping get_job_metrics test") + + # Use a non-existent job ID - should return 404 which is expected + sample_job_id = "non-existent-job-123" + + response = maap.get_job_metrics(sample_job_id) + + # Should get a valid response - 200 if found, 404 if not found, 425 if not available + assert response.status_code in [200, 404, 425], f"Expected 200, 404, or 425, got {response.status_code}: {response.text}" + + # If job metrics exist (200), should return JSON with metrics info + if response.status_code == 200: + json_data = response.json() + assert isinstance(json_data, dict), "Job metrics response should be a JSON object" + # Should contain metrics like CPU usage, memory usage, duration, etc. + + # Verify the URL contains the job ID and 'metrics' + assert str(sample_job_id) in response.url + assert 'metrics' in response.url \ No newline at end of file