Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 274 additions & 1 deletion maap/maap.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,26 @@ def _upload_s3(self, filename, bucket, objectKey):
It uses the boto3 S3 client configured at module level.
"""
return s3_client.upload_file(filename, bucket, objectKey)

def searchGranule(self):
"""
Search for granules in the CMR (Common Metadata Repository).

Queries the CMR database for granules matching the specified criteria.
Granules represent individual data files within a collection.

DEPRECATION NOTICE: This method is no longer supported. Use the following instead:

search_granule(limit=20, **kwargs)

limit : int, optional
Maximum number of results to return. Default is 20.
**kwargs : dict
Search parameters to filter results. Common parameters include:

"""
raise Exception("searchGranule() is no longer supported. Use search_granule(limit=20, **kwargs) instead.")


def search_granule(self, limit=20, **kwargs):
"""
Expand Down Expand Up @@ -327,6 +347,31 @@ def search_granule(self, limit=20, **kwargs):
self.config.search_granule_url,
self._get_api_header(),
self._DPS) for result in results][:limit]


def downloadGranule(self):
"""
Download a granule directly from an HTTP URL.

Downloads data from an Earthdata HTTP URL, handling both public and
protected (authenticated) resources automatically.

DEPRECATION NOTICE: This method is no longer supported. Use the following instead:

download_granule(online_access_url, destination_path=".", overwrite=False)

online_access_url : str
The HTTP URL of the granule to download. This is typically obtained
from a granule's ``OnlineAccessURL`` field.
destination_path : str, optional
Directory path where the file will be saved. Default is the current
working directory (``'.'``).
overwrite : bool, optional
If ``True``, overwrite existing files. If ``False`` (default), skip
download if the file already exists.

"""
raise Exception("downloadGranule() is no longer supported. Use download_granule(online_access_url, destination_path=\".\", overwrite=False) instead.")

def download_granule(self, online_access_url, destination_path=".", overwrite=False):
"""
Expand Down Expand Up @@ -396,6 +441,30 @@ def download_granule(self, online_access_url, destination_path=".", overwrite=Fa
proxy._apiHeader = self._get_api_header()
# noinspection PyProtectedMember
return proxy._getHttpData(online_access_url, overwrite, final_destination)


def getCallFromEarthdataQuery(self):
"""
Generate a MAAP API call string from an Earthdata search query.

Converts a JSON-formatted Earthdata search query into a Python code
string that can be used to call the MAAP API.

DEPRECATION NOTICE: This method is no longer supported. Use the following instead:

get_call_from_earthdata_query(query, variable_name='maap', limit=1000)

query : str
A JSON-formatted string representing an Earthdata search query.
This is the format used by the Earthdata Search application.
variable_name : str, optional
The variable name to use in the generated code for the MAAP
client instance. Default is ``'maap'``.
limit : int, optional
Maximum number of records to return. Default is 1000.

"""
raise Exception("getCallFromEarthdataQuery() is no longer supported. Use get_call_from_earthdata_query(query, variable_name='maap', limit=1000) instead.")

def get_call_from_earthdata_query(self, query, variable_name='maap', limit=1000):
"""
Expand Down Expand Up @@ -443,6 +512,28 @@ def get_call_from_earthdata_query(self, query, variable_name='maap', limit=1000)
"""
return self._CMR.generateGranuleCallFromEarthDataRequest(query, variable_name, limit)

def getCallFromCmrUri(self):
"""
Generate a MAAP API call string from a CMR REST API URL.

DEPRECATION NOTICE: This method is no longer supported. Use the following instead:

get_call_from_cmr_uri(search_url, variable_name='maap', limit=1000, search='granule')

search_url : str
A CMR REST API search URL. This can be copied directly from
the CMR API or browser address bar.
variable_name : str, optional
The variable name to use in the generated code for the MAAP
client instance. Default is ``'maap'``.
limit : int, optional
Maximum number of records to return. Default is 1000.
search : str, optional
Type of search to perform. Either ``'granule'`` (default) or
``'collection'``.
"""
raise Exception("getCallFromCmrUri() is no longer supported. Use get_call_from_cmr_uri(search_url, variable_name='maap', limit=1000, search='granule') instead.")

def get_call_from_cmr_uri(self, search_url, variable_name='maap', limit=1000, search='granule'):
"""
Generate a MAAP API call string from a CMR REST API URL.
Expand Down Expand Up @@ -500,6 +591,38 @@ def get_call_from_cmr_uri(self, search_url, variable_name='maap', limit=1000, se
"""
return self._CMR.generateCallFromEarthDataQueryString(search_url, variable_name, limit, search)

def searchCollection(self):
"""
Search for collections in the CMR (Common Metadata Repository).

DEPRECATION NOTICE: This method is no longer supported. Use the following instead:

search_collection(limit=100, **kwargs)

limit : int, optional
Maximum number of results to return. Default is 100.
**kwargs : dict
Search parameters to filter results. Common parameters include:

short_name : str
Collection short name (e.g., 'GEDI02_A').
concept_id : str
Unique CMR collection identifier.
provider : str
Data provider (e.g., 'MAAP', 'LPDAAC_ECS').
keyword : str
Keyword search across collection metadata.
instrument : str
Filter by instrument name.
platform : str
Filter by platform name.
project : str
Filter by project name.
processing_level_id : str
Filter by data processing level.
"""
raise Exception("searchCollection() is no longer supported. Use search_collection(limit=100, **kwargs) instead.")

def search_collection(self, limit=100, **kwargs):
"""
Search for collections in the CMR (Common Metadata Repository).
Expand Down Expand Up @@ -572,6 +695,20 @@ def search_collection(self, limit=100, **kwargs):
"""
results = self._CMR.get_search_results(url=self.config.search_collection_url, limit=limit, **kwargs)
return [Collection(result, self.config.maap_host) for result in results][:limit]

def getQueues(self):
"""
Get available DPS processing queues (resources).

Retrieves a list of available compute resources (queues) that can be
used for algorithm execution. Different queues provide different
amounts of memory and CPU.

DEPRECATION NOTICE: This method is no longer supported. Use the following instead:

get_queues()
"""
raise Exception("getQueues() is no longer supported. Use get_queues() instead.")

def get_queues(self):
"""
Expand Down Expand Up @@ -635,7 +772,7 @@ def deploy_algorithm_from_cwl_file(self, file_path):
json=process
)
return response

def replace_algorithm_from_cwl_file(self, process_id, file_path):
"""
Deploys an algorithm from a CWL file
Expand All @@ -656,6 +793,19 @@ def replace_algorithm_from_cwl_file(self, process_id, file_path):
)
return response

def uploadFiles(self):
"""
Upload files to MAAP shared storage.

DEPRECATION NOTICE: This method is no longer supported. Use the following instead:

upload_files(filenames)

filenames : list of str
List of local file paths to upload.
"""
raise Exception("uploadFiles() is no longer supported. Use upload_files(filenames) instead.")

def upload_files(self, filenames):
"""
Upload files to MAAP shared storage.
Expand Down Expand Up @@ -818,6 +968,16 @@ def show(self, granule, display_config={}):
viz.show()

# OGC-compliant endpoint functions
def listAlgorithms(self):
"""
Search all OGC processes.

DEPRECATION NOTICE: This method is no longer supported. Use the following instead:

list_algorithms()
"""
raise Exception("listAlgorithms() is no longer supported. Use list_algorithms() instead.")

def list_algorithms(self):
"""
Search all OGC processes
Expand Down Expand Up @@ -867,6 +1027,18 @@ def get_deployment_status(self, deployment_id):
)
return response

def describeAlgorithm(self):
"""
Get detailed information about a specific OGC process.

DEPRECATION NOTICE: This method is no longer supported. Use the following instead:

describe_algorithm(process_id)

process_id: The process ID to describe
"""
raise Exception("describeAlgorithm() is no longer supported. Use describe_algorithm(process_id) instead.")

def describe_algorithm(self, process_id):
"""
Get detailed information about a specific OGC process
Expand Down Expand Up @@ -903,6 +1075,18 @@ def update_algorithm(self, process_id, execution_unit_href):
)
return response

def deleteAlgorithm(self):
"""
Delete an existing OGC process (must be the original deployer).

DEPRECATION NOTICE: This method is no longer supported. Use the following instead:

delete_algorithm(process_id)

process_id: The process ID to delete
"""
raise Exception("deleteAlgorithm() is no longer supported. Use delete_algorithm(process_id) instead.")

def delete_algorithm(self, process_id):
"""
Delete an existing OGC process (must be the original deployer)
Expand Down Expand Up @@ -932,6 +1116,22 @@ def get_algorithm_package(self, process_id):
headers=headers
)
return response

def submitJob(self):
"""
Submit a job to the MAAP Data Processing System (DPS).

DEPRECATION NOTICE: This method is no longer supported. Use the following instead:

submit_job(process_id, inputs, queue)

process_id: The process ID to execute
inputs: Dictionary of input parameters for the process
queue: Queue to run the job on
"""
raise Exception("submitJob() is no longer supported. Use submit_job(process_id, inputs, queue) instead.")



def submit_job(self, process_id, inputs, queue, dedup=None, tag=None):
"""
Expand Down Expand Up @@ -963,6 +1163,18 @@ def submit_job(self, process_id, inputs, queue, dedup=None, tag=None):
)
return response

def getJobStatus(self):
"""
Get the status of an OGC job.

DEPRECATION NOTICE: This method is no longer supported. Use the following instead:

get_job_status(job_id)

job_id: The job ID to check status for
"""
raise Exception("getJobStatus() is no longer supported. Use get_job_status(job_id) instead.")

def get_job_status(self, job_id):
"""
Get the status of an OGC job
Expand All @@ -979,6 +1191,19 @@ def get_job_status(self, job_id):
)
return response

def cancelJob(self):
"""
Cancel a running OGC job or delete a queued job.

DEPRECATION NOTICE: This method is no longer supported. Use the following instead:

cancel_job(job_id, wait_for_completion=False)

job_id: The job ID to cancel
wait_for_completion: Whether to wait for the cancellation to complete
"""
raise Exception("cancelJob() is no longer supported. Use cancel_job(job_id, wait_for_completion=False) instead.")

def cancel_job(self, job_id, wait_for_completion=False):
"""
Cancel a running OGC job or delete a queued job
Expand All @@ -1001,6 +1226,18 @@ def cancel_job(self, job_id, wait_for_completion=False):
)
return response

def getJobResult(self):
"""
Get the results of a completed OGC job.

DEPRECATION NOTICE: This method is no longer supported. Use the following instead:

get_job_result(job_id)

job_id: The job ID to get results for
"""
raise Exception("getJobResult() is no longer supported. Use get_job_result(job_id) instead.")

def get_job_result(self, job_id):
"""
Get the results of a completed OGC job
Expand All @@ -1016,6 +1253,30 @@ def get_job_result(self, job_id):
)
return response

def listJobs(self):
"""
Returns a list of jobs for a given user that matches query params provided.

DEPRECATION NOTICE: This method is no longer supported. Use the following instead:

list_jobs(*, process_id=None, limit=None, get_job_details=True, offset=0, page_size=10, queue=None, status=None, tag=None, min_duration=None, max_duration=None, type=None, datetime=None, priority=None)

process_id (id, optional): Algorithm ID to only show jobs submitted for this algorithm
limit (int, optional): Limit of jobs to send back
get_job_details (bool, optional): Flag that determines whether to return a detailed job list or a compact list containing just the job ids and their associated job tags. Default is True.
offset (int, optional): Offset for pagination. Default is 0.
page_size (int, optional): Page size for pagination. Default is 10.
queue (str, optional): Job processing resource.
status (str, optional): Job status, e.g. job-completed, job-failed, job-started, job-queued.
tag (str, optional): User job tag/identifier.
min_duration (int, optional): Minimum duration in seconds
max_duration (int, optional): Maximum duration in seconds
type (str, optional): Type, available values: process
datetime (str, optional): Either a date-time or an interval, half-bounded or bounded. Date and time expressions adhere to RFC 3339. Half-bounded intervals are expressed using double-dots.
priority (int, optional): Job priority, 0-9
"""
raise Exception("listJobs() is no longer supported. Use list_jobs() with appropriate keyword arguments instead.")

def list_jobs(self, *,
process_id=None,
limit=None,
Expand Down Expand Up @@ -1085,6 +1346,18 @@ def list_jobs(self, *,
)
return response

def getJobMetrics(self):
"""
Get metrics for an OGC job.

DEPRECATION NOTICE: This method is no longer supported. Use the following instead:

get_job_metrics(job_id)

job_id: The job ID to get metrics for
"""
raise Exception("getJobMetrics() is no longer supported. Use get_job_metrics(job_id) instead.")

def get_job_metrics(self, job_id):
"""
Get metrics for an OGC job
Expand Down
Loading