diff --git a/.github/workflows/tag_and_publish.yml b/.github/workflows/tag_and_publish.yml index 7f2e0b3..a9bb671 100644 --- a/.github/workflows/tag_and_publish.yml +++ b/.github/workflows/tag_and_publish.yml @@ -10,10 +10,10 @@ jobs: continue-on-error: true steps: - uses: actions/checkout@v3 - - name: Set up Python 3.8 + - name: Set up Python 3.9 uses: actions/setup-python@v3 with: - python-version: 3.8 + python-version: 3.9 - name: Install dependencies run: | python -m pip install -e .[dev] --no-cache-dir @@ -66,10 +66,10 @@ jobs: - uses: actions/checkout@v3 - name: Pull latest changes run: git pull origin main - - name: Set up Python 3.8 + - name: Set up Python 3.9 uses: actions/setup-python@v2 with: - python-version: 3.8 + python-version: 3.9 - name: Install dependencies run: | pip install --upgrade setuptools wheel twine build diff --git a/.github/workflows/test_and_lint.yml b/.github/workflows/test_and_lint.yml index aa639dd..3481b8e 100644 --- a/.github/workflows/test_and_lint.yml +++ b/.github/workflows/test_and_lint.yml @@ -10,7 +10,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [ '3.8', '3.9', '3.10' ] + python-version: [ '3.9', '3.10', '3.11', '3.12' ] steps: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} diff --git a/README.md b/README.md index 8a230bd..d8af705 100644 --- a/README.md +++ b/README.md @@ -31,80 +31,41 @@ pip install -e .[dev] The package includes helper functions to interact with Code Ocean: -### `CodeOceanJob` +### `APIHandler` -This class enables one to run a job that: +This class enables one to: -1. Registers a new asset to Code Ocean from s3 -2. Runs a capsule/pipeline on the newly registered asset (or an existing assey) -3. Captures the run results into a new asset +1. Update asset tags in Code Ocean +2. Find external data assets that do not exist in S3 +3. Find external data assets -Steps 1 and 3 are optional, while step 2 (running the computation) is mandatory. - -Here is a full example that registers a new ecephys asset, runs the spike sorting -capsule with some parameters, and registers the results: ```python import os -from aind_codeocean_api.codeocean import CodeOceanClient -from aind_codeocean_utils.codeocean_job import ( - CodeOceanJob, CodeOceanJobConfig -) +from codeocean.client import CodeOcean +from aind_codeocean_utils.api_handler import APIHandler -# Set up the CodeOceanClient from aind_codeocean_api +# Get token and domain parameters for CodeOcean client CO_TOKEN = os.environ["CO_TOKEN"] CO_DOMAIN = os.environ["CO_DOMAIN"] -co_client = CodeOceanClient(domain=CO_DOMAIN, token=CO_TOKEN) - -# Define Job Parameters -job_config_dict = dict( - register_config = dict( - asset_name="test_dataset_for_codeocean_job", - mount="ecephys_701305_2023-12-26_12-22-25", - bucket="aind-ephys-data", - prefix="ecephys_701305_2023-12-26_12-22-25", - tags=["codeocean_job_test", "ecephys", "701305", "raw"], - custom_metadata={ - "modality": "extracellular electrophysiology", - "data level": "raw data", - }, - viewable_to_everyone=True - ), - run_capsule_config = dict( - data_assets=None, # when None, the newly registered asset will be used - capsule_id="a31e6c81-49a5-4f1c-b89c-2d47ae3e02b4", - run_parameters=["--debug", "--no-remove-out-channels"] - ), - capture_result_config = dict( - process_name="sorted", - tags=["np-ultra"] # additional tags to the ones inherited from input - ) -) +co_client = CodeOcean(domain=CO_DOMAIN, token=CO_TOKEN) -# instantiate config model -job_config = CodeOceanJobConfig(**job_config_dict) +api_handler = APIHandler(co_client) -# instantiate code ocean job -co_job = CodeOceanJob(co_client=co_client, job_config=job_config) +data_assets = [ + co_client.data_assets.get_data_asset(data_asset_id="abc"), + co_client.data_assets.get_data_asset(data_asset_id="def") +] -# run and wait for results -job_response = co_job.run_job() +api_handler.update_tags( + tags_to_remove=["test"], + tags_to_add=["new_tag"], + data_assets=data_assets, +) ``` -This job will: -1. Register the `test_dataset_for_codeocean_job` asset from the specified s3 bucket and prefix -2. Run the capsule `a31e6c81-49a5-4f1c-b89c-2d47ae3e02b4` with the specified parameters -3. Register the result as `test_dataset_for_codeocean_job_sorter_{date-time}` - - -To run a computation on existing data assets, do not provide the `register_config` and -provide the `data_asset` field in the `run_capsule_config`. - -To skip capturing the result, do not provide the `capture_result_config` option. - - ## Contributing ### Linters and testing diff --git a/pyproject.toml b/pyproject.toml index dbe105a..f8500ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" name = "aind-codeocean-utils" description = "Generated from aind-library-template" license = {text = "MIT"} -requires-python = ">=3.7" +requires-python = ">=3.9" authors = [ {name = "Allen Institute for Neural Dynamics"} ] @@ -17,9 +17,8 @@ readme = "README.md" dynamic = ["version"] dependencies = [ - "aind-codeocean-api>=0.4.0", - "aind-data-schema>=0.38.0", - "pydantic>=2.7" + "codeocean>=0.3.0", + "boto3" ] [project.optional-dependencies] @@ -41,7 +40,7 @@ version = {attr = "aind_codeocean_utils.__version__"} [tool.black] line-length = 79 -target_version = ['py37'] +target_version = ['py39'] exclude = ''' ( diff --git a/src/aind_codeocean_utils/alert_bot.py b/src/aind_codeocean_utils/alert_bot.py deleted file mode 100644 index 5f108cf..0000000 --- a/src/aind_codeocean_utils/alert_bot.py +++ /dev/null @@ -1,92 +0,0 @@ -"""Module with Alert Bot for notifications on MS Teams""" - -from typing import Optional - -import requests - - -class AlertBot: - """Class to handle sending alerts and messages in MS Teams.""" - - def __init__(self, url: str): - """ - AlertBot constructor - - Parameters - ---------- - url : str - The url to send the message to - """ - self.url = url - - @staticmethod - def _create_body_text(message: str, extra_text: Optional[str]) -> dict: - """ - Parse strings into appropriate format to send to ms teams channel. - Check here: - https://learn.microsoft.com/en-us/microsoftteams/platform/ - task-modules-and-cards/cards/cards-reference#adaptive-card - Parameters - ---------- - message : str - The main message content - extra_text : Optional[str] - Additional text to send in card body - - Returns - ------- - dict - - """ - body: list = [ - { - "type": "TextBlock", - "size": "Medium", - "weight": "Bolder", - "text": message, - } - ] - if extra_text is not None: - body.append({"type": "TextBlock", "text": extra_text}) - contents = { - "type": "message", - "attachments": [ - { - "contentType": "application/vnd.microsoft.card.adaptive", - "content": { - "type": "AdaptiveCard", - "body": body, - "$schema": ( - "http://adaptivecards.io/schemas/" - "adaptive-card.json" - ), - "version": "1.0", - }, - } - ], - } - return contents - - def send_message( - self, message: str, extra_text: Optional[str] = None - ) -> Optional[requests.Response]: - """ - Sends a message via requests.post - - Parameters - ---------- - message : str - The main message content - extra_text : Optional[str] - Additional text to send in card body - - Returns - ------- - Optional[requests.Response] - If the url is None, only print and return None. Otherwise, post - message to url and return the response. - - """ - contents = self._create_body_text(message, extra_text) - response = requests.post(self.url, json=contents) - return response diff --git a/src/aind_codeocean_utils/api_handler.py b/src/aind_codeocean_utils/api_handler.py index 0432598..0898cf0 100644 --- a/src/aind_codeocean_utils/api_handler.py +++ b/src/aind_codeocean_utils/api_handler.py @@ -1,19 +1,17 @@ """Module of classes to handle interfacing with the Code Ocean index.""" import logging -import time from datetime import datetime from typing import Dict, Iterator, List, Optional -import requests -from aind_codeocean_api.codeocean import CodeOceanClient -from aind_codeocean_api.models.computations_requests import ( - ComputationDataAsset, -) -from aind_codeocean_api.models.data_assets_requests import ( - CreateDataAssetRequest, -) from botocore.client import BaseClient +from codeocean import CodeOcean +from codeocean.data_asset import ( + DataAsset, + DataAssetSearchParams, + DataAssetType, + DataAssetUpdateParams, +) class APIHandler: @@ -21,7 +19,7 @@ class APIHandler: def __init__( self, - co_client: CodeOceanClient, + co_client: CodeOcean, s3: Optional[BaseClient] = None, dryrun: bool = False, ): @@ -46,7 +44,7 @@ def update_tags( tags_to_remove: Optional[List[str]] = None, tags_to_add: Optional[List[str]] = None, tags_to_replace: Optional[Dict[str, str]] = None, - data_assets=Iterator[dict], + data_assets=Iterator[DataAsset], ) -> None: """ Updates tags for a list of data assets. Will first remove tags in the @@ -64,7 +62,7 @@ def update_tags( tags_to_replace: Optional[Dict[str, str]] Optional dictionary of tags to replace. For example, {"old_tag0": "new_tag0", "old_tag1": "new_tag1"}. - data_assets : Iterator[dict] + data_assets : Iterator[DataAsset] An iterator of data assets. The shape of the response is described at: "https://docs.codeocean.com @@ -86,37 +84,39 @@ def update_tags( ) for data_asset in data_assets: # Remove tags in tags_to_remove - tags = ( - set() - if data_asset.get("tags") is None - else set(data_asset["tags"]) - ) + # noinspection PyTypeChecker + tags = set() if data_asset.tags is None else set(data_asset.tags) tags.difference_update(tags_to_remove) tags.update(tags_to_add) mapped_tags = {tags_to_replace.get(tag, tag) for tag in tags} - data_asset_id = data_asset["id"] - data_asset_name = data_asset["name"] - logging.debug(f"Updating data asset: {data_asset}") - # new_name is a required field, we can set it to the original name + data_asset_id = data_asset.id + data_asset_name = data_asset.name + update_data_asset_params = DataAssetUpdateParams( + name=data_asset_name, + description=data_asset.description, + tags=mapped_tags, + mount=data_asset.mount, + ) + logging.debug( + f"Updating data asset: {data_asset} with " + f"{update_data_asset_params}." + ) if self.dryrun is True: logging.info( - f"(dryrun): " - f"co_client.update_data_asset(" - f"data_asset_id={data_asset_id}," - f"new_name={data_asset_name}," - f"new_tags={mapped_tags},)" + f"(dryrun): co_client.data_assets.update_metadata(" + f"data_asset_id={data_asset_id}, " + f"update_params={update_data_asset_params})" ) else: - response = self.co_client.update_data_asset( + response = self.co_client.data_assets.update_metadata( data_asset_id=data_asset_id, - new_name=data_asset_name, - new_tags=list(mapped_tags), + update_params=update_data_asset_params, ) - logging.info(response.json()) + logging.info(f"Code Ocean Response: {response}") def find_archived_data_assets_to_delete( self, keep_after: datetime - ) -> List[dict]: + ) -> List[DataAsset]: """ Find archived data assets which were last used before the keep_after datetime. @@ -127,22 +127,23 @@ def find_archived_data_assets_to_delete( Returns ------- - List[dict] + List[DataAsset] A list of data assets objects """ - assets = self.co_client.search_all_data_assets(archived=True).json()[ - "results" - ] + search_params = DataAssetSearchParams(archived=True, limit=1000) + assets = self.co_client.data_assets.search_data_assets_iterator( + search_params=search_params + ) assets_to_delete = [] for asset in assets: - created = datetime.fromtimestamp(asset["created"]) + created = datetime.fromtimestamp(asset.created) last_used = ( - datetime.fromtimestamp(asset["last_used"]) - if asset["last_used"] != 0 + datetime.fromtimestamp(asset.last_used) + if asset.last_used != 0 else None ) @@ -158,15 +159,15 @@ def find_archived_data_assets_to_delete( internal_count = 0 internal_size = 0 for asset in assets_to_delete: - size = asset.get("size", 0) - is_external = "sourceBucket" in asset + size = asset.size if asset.size else 0 + is_external = asset.source_bucket is not None if is_external: external_count += 1 external_size += size else: internal_count += 1 internal_size += size - logging.info(f"name: {asset['name']}, type: {asset['type']}") + logging.info(f"name: {asset.name}, type: {asset.type}") logging.info( f"{len(assets_to_delete)}/{len(assets)} archived assets deletable" @@ -180,26 +181,29 @@ def find_archived_data_assets_to_delete( return assets_to_delete - def find_external_data_assets(self) -> Iterator[dict]: + def find_external_data_assets(self) -> Iterator[DataAsset]: """ Find external data assets by checking if the data asset responses from CodeOcean have a source bucket and are of type 'dataset'. Returns ------- - Iterator[dict] + Iterator[DataAsset] An iterator of data assets objects """ - - response = self.co_client.search_all_data_assets(type="dataset") - assets = response.json()["results"] + search_params = DataAssetSearchParams( + type=DataAssetType.Dataset, limit=1000 + ) + assets = self.co_client.data_assets.search_data_assets_iterator( + search_params=search_params + ) for asset in assets: - bucket = asset.get("sourceBucket", {}).get("bucket", None) - if bucket: + source_bucket = asset.source_bucket + if source_bucket and source_bucket.bucket and source_bucket.prefix: yield asset - def find_nonexistent_external_data_assets(self) -> Iterator[dict]: + def find_nonexistent_external_data_assets(self) -> Iterator[DataAsset]: """ Find external data assets that do not exist in S3. Makes a call to CodeOcean and returns an iterator over external data assets. @@ -208,19 +212,17 @@ def find_nonexistent_external_data_assets(self) -> Iterator[dict]: data asset will not be added to the return response. Returns ------- - Iterator[dict] + Iterator[DataAsset] An iterator of data asset objects. """ for asset in self.find_external_data_assets(): - sb = asset["sourceBucket"] + sb = asset.source_bucket try: - exists = self._bucket_prefix_exists(sb["bucket"], sb["prefix"]) - logging.debug( - f"{sb['bucket']} {sb['prefix']} exists? {exists}" - ) + exists = self._bucket_prefix_exists(sb.bucket, sb.prefix) + logging.debug(f"{sb.bucket} {sb.prefix} exists? {exists}") if not exists: yield asset except Exception as e: @@ -243,135 +245,9 @@ def _bucket_prefix_exists(self, bucket: str, prefix: str) -> bool: """ + # TODO: use head object command, which is cheaper prefix = prefix.rstrip("/") resp = self.s3.list_objects( Bucket=bucket, Prefix=prefix, Delimiter="/", MaxKeys=1 ) return "CommonPrefixes" in resp - - def wait_for_data_availability( - self, - data_asset_id: str, - timeout_seconds: int = 300, - pause_interval=10, - ) -> requests.Response: - """ - There is a lag between when a register data request is made and - when the data is available to be used in a capsule. - Parameters - ---------- - data_asset_id : str - ID of the data asset to check for. - timeout_seconds : int - Roughly how long the method should check if the data is available. - pause_interval : int - How many seconds between when the backend is queried. - - Returns - ------- - requests.Response - - """ - - num_of_checks = 0 - break_flag = False - time.sleep(pause_interval) - response = self.co_client.get_data_asset(data_asset_id) - if ((pause_interval * num_of_checks) > timeout_seconds) or ( - response.status_code == 200 - ): - break_flag = True - while not break_flag: - time.sleep(pause_interval) - response = self.co_client.get_data_asset(data_asset_id) - num_of_checks += 1 - if ((pause_interval * num_of_checks) > timeout_seconds) or ( - response.status_code == 200 - ): - break_flag = True - return response - - def check_data_assets( - self, data_assets: List[ComputationDataAsset] - ) -> None: - """ - Check if data assets exist. - - Parameters - ---------- - data_assets : list - List of data assets to check for. - - Raises - ------ - FileNotFoundError - If a data asset is not found. - ConnectionError - If there is an issue retrieving a data asset. - """ - for data_asset in data_assets: - assert isinstance( - data_asset, ComputationDataAsset - ), "Data assets must be of type ComputationDataAsset" - data_asset_id = data_asset.id - response = self.co_client.get_data_asset(data_asset_id) - if response.status_code == 404: - raise FileNotFoundError(f"Unable to find: {data_asset_id}") - elif response.status_code != 200: - raise ConnectionError( - f"There was an issue retrieving: {data_asset_id}" - ) - - def create_data_asset_and_update_permissions( - self, - request: CreateDataAssetRequest, - assets_viewable_to_everyone: bool = True, - ) -> requests.Response: - """ - Register a data asset. Can also optionally update the permissions on - the data asset. - - Parameters - ---------- - request : CreateDataAssetRequest - - Notes - ----- - The credentials for the s3 bucket must be set in the environment. - - Returns - ------- - requests.Response - """ - - create_data_asset_response = self.co_client.create_data_asset(request) - create_data_asset_response_json = create_data_asset_response.json() - - if create_data_asset_response_json.get("id") is None: - raise KeyError( - f"Something went wrong registering" - f" '{request.name}'. " - f"Response Status Code: " - f"{create_data_asset_response.status_code}. " - f"Response Message: {create_data_asset_response_json}" - ) - - if assets_viewable_to_everyone: - data_asset_id = create_data_asset_response_json["id"] - response_data_available = self.wait_for_data_availability( - data_asset_id - ) - - if response_data_available.status_code != 200: - raise FileNotFoundError(f"Unable to find: {data_asset_id}") - - # Make data asset viewable to everyone - update_data_perm_response = self.co_client.update_permissions( - data_asset_id=data_asset_id, everyone="viewer" - ) - logging.info( - "Permissions response: " - f"{update_data_perm_response.status_code}" - ) - - return create_data_asset_response diff --git a/src/aind_codeocean_utils/codeocean_job.py b/src/aind_codeocean_utils/codeocean_job.py deleted file mode 100644 index f93003b..0000000 --- a/src/aind_codeocean_utils/codeocean_job.py +++ /dev/null @@ -1,467 +0,0 @@ -""" -Utility for coordinating registration, processing, -and capture of results in Code Ocean -""" - -import logging -import time -from datetime import datetime -from enum import Enum -from typing import List, Optional, Tuple - -import requests -from aind_codeocean_api.codeocean import CodeOceanClient -from aind_codeocean_api.models.computations_requests import ( - ComputationDataAsset, - RunCapsuleRequest, -) -from aind_codeocean_api.models.data_assets_requests import ( - CreateDataAssetRequest, - Source, - Sources, - Target, - Targets, -) -from aind_data_schema.core.data_description import DataLevel -from aind_data_schema_models.data_name_patterns import datetime_to_name_string -from pydantic import BaseModel, Field - -from aind_codeocean_utils.api_handler import APIHandler - -logger = logging.getLogger(__name__) - - -class CustomMetadataKeys(str, Enum): - """ - Keys used for custom metadata in Code OCean - """ - - DATA_LEVEL = "data level" - - -def construct_asset_tags_and_metadata( - asset_name: str, tags: list = None, custom_metadata: dict = None -) -> tuple: - """Construct metadata for new data assets""" - tags = set(tags) if tags is not None else set() - custom_metadata = custom_metadata or dict() - custom_metadata = custom_metadata.copy() - - tokens = asset_name.split("_") - if len(tokens) >= 2: - platform, subject_id = tokens[0], tokens[1] - - tags.update((platform, subject_id)) - - custom_metadata.update( - { - "experiment type": platform, - "subject id": subject_id, - } - ) - - return tags, custom_metadata - - -def build_processed_data_asset_name(input_data_asset_name, process_name): - """Build a name for a processed data asset.""" - - capture_time = datetime_to_name_string(datetime.now()) - - return f"{input_data_asset_name}_{process_name}_{capture_time}" - - -def add_data_level_metadata( - data_level: DataLevel, - tags: List[str] = None, - custom_metadata: dict = None, -) -> Tuple[List[str], dict]: - """Add data level metadata to tags and custom metadata.""" - tags = set(tags or []) - tags.add(data_level.value) - - if data_level == DataLevel.DERIVED: - tags.discard(DataLevel.RAW.value) - - tags = sorted(list(tags)) - - custom_metadata = custom_metadata or {} - custom_metadata.update( - {CustomMetadataKeys.DATA_LEVEL.value: data_level.value} - ) - - return tags, custom_metadata - - -class ProcessConfig(BaseModel): - """ - Settings for processing data - """ - - request: RunCapsuleRequest = Field( - description="Request to run a capsule or pipeline." - ) - input_data_asset_mount: Optional[str] = Field( - default=None, - description=( - "Mount point for the input data asset. " - "This is only used to specify the mount of a newly registered " - "data asset." - ), - ) - poll_interval_seconds: Optional[int] = Field( - default=300, - description=( - "Time in seconds to wait between polling for the completion of " - "the computation." - ), - ) - timeout_seconds: Optional[int] = Field( - default=None, - description=( - "Time in seconds to wait for the computation to complete. " - "If None, the computation will be polled indefinitely." - ), - ) - - -class CaptureConfig(BaseModel): - """ - Settings for capturing results - """ - - request: Optional[CreateDataAssetRequest] = Field( - default=None, - description=( - "Request to create a data asset based on a processed result." - ), - ) - - process_name: Optional[str] = Field( - default="processed", description="Name of the process." - ) - input_data_asset_name: Optional[str] = Field( - default=None, description="Name of the input data asset." - ) - output_bucket: Optional[str] = Field( - default=None, description="Name of the output bucket." - ) - - -class CodeOceanJobConfig(BaseModel): - """ - Class for coordinating registration, processing, and capture of results in - Code Ocean - """ - - register_config: Optional[CreateDataAssetRequest] = None - process_config: Optional[ProcessConfig] = None - capture_config: Optional[CaptureConfig] = None - assets_viewable_to_everyone: bool = Field( - default=True, - description=( - "Whether the assets should be viewable to everyone. " - "If True, the assets will be viewable to everyone. " - "If False, the assets will be viewable only to the user who " - "registered them." - ), - ) - add_subject_and_platform_metadata: bool = Field( - default=True, - description=( - "Whether to add metadata about the subject and platform to the " - "data assets." - ), - ) - add_data_level_metadata: bool = Field( - default=True, - description=( - "Whether to add metadata about the data level to the data assets." - ), - ) - - -class CodeOceanJob: - """ - Class for coordinating registration, processing, and capture of results in - Code Ocean - """ - - def __init__( - self, co_client: CodeOceanClient, job_config: CodeOceanJobConfig - ): - """ - The CodeOceanJob constructor - """ - job_config = job_config.model_copy(deep=True) - self.api_handler = APIHandler(co_client=co_client) - self.register_config = job_config.register_config - self.process_config = job_config.process_config - self.capture_config = job_config.capture_config - self.assets_viewable_to_everyone = ( - job_config.assets_viewable_to_everyone - ) - self.add_data_level_metadata = job_config.add_data_level_metadata - self.add_subject_and_platform_metadata = ( - job_config.add_subject_and_platform_metadata - ) - - def run_job(self): - """Run the job.""" - - register_data_response = None - process_response = None - capture_response = None - - if self.capture_config: - assert ( - self.process_config is not None - ), "process_config must be provided to capture results" - - if self.register_config: - register_data_response = self.register_data( - request=self.register_config - ) - - if self.process_config: - process_response = self.process_data( - register_data_response=register_data_response - ) - - if self.capture_config: - capture_response = self.capture_result( - process_response=process_response - ) - - return register_data_response, process_response, capture_response - - def register_data( - self, request: CreateDataAssetRequest - ) -> requests.Response: - """Register the data asset, also handling metadata tagging.""" - tags = request.tags or [] - custom_metadata = request.custom_metadata or {} - if self.add_subject_and_platform_metadata: - tags, custom_metadata = construct_asset_tags_and_metadata( - request.name, tags, custom_metadata - ) - if self.add_data_level_metadata: - tags, custom_metadata = add_data_level_metadata( - DataLevel.RAW, - tags, - custom_metadata, - ) - request.tags = tags - request.custom_metadata = custom_metadata - - # TODO handle non-aws sources - if request.source.aws is not None: - assert ( - request.source.aws.keep_on_external_storage is True - ), "Data assets must be kept on external storage." - - response = self.api_handler.create_data_asset_and_update_permissions( - request=request, - assets_viewable_to_everyone=self.assets_viewable_to_everyone, - ) - - return response - - def process_data( - self, register_data_response: requests.Response = None - ) -> requests.Response: - """Process the data, handling the case where the data was just - registered upstream.""" - - if self.process_config.request.data_assets is None: - self.process_config.request.data_assets = [] - - assert isinstance( - self.process_config.request.data_assets, list - ), "data_assets must be a list" - - if len(self.process_config.request.data_assets) > 0: - if isinstance(self.process_config.request.data_assets[0], dict): - self.process_config.request.data_assets = [ - ComputationDataAsset(**asset) - for asset in self.process_config.request.data_assets - ] - else: - assert register_data_response is not None, ( - "No input data assets provided and no data asset was " - "registered upstream." - ) - - if register_data_response: - input_data_asset_id = register_data_response.json()["id"] - - if self.process_config.input_data_asset_mount: - input_data_asset_mount = ( - self.process_config.input_data_asset_mount - ) - else: - input_data_asset_mount = self.register_config.mount - - self.process_config.request.data_assets.append( - ComputationDataAsset( - id=input_data_asset_id, mount=input_data_asset_mount - ) - ) - - self.api_handler.check_data_assets( - self.process_config.request.data_assets - ) - - run_capsule_response = self.api_handler.co_client.run_capsule( - self.process_config.request - ) - run_capsule_response_json = run_capsule_response.json() - - if run_capsule_response_json.get("id") is None: - raise KeyError( - f"Something went wrong running the capsule or pipeline. " - f"Response Status Code: {run_capsule_response.status_code}. " - f"Response Message: {run_capsule_response_json}" - ) - - computation_id = run_capsule_response_json["id"] - - # TODO: We may need to clean up the loop termination logic - if self.process_config.poll_interval_seconds: - executing = True - num_checks = 0 - while executing: - num_checks += 1 - time.sleep(self.process_config.poll_interval_seconds) - computation_response = ( - self.api_handler.co_client.get_computation(computation_id) - ) - curr_computation_state = computation_response.json() - - if (curr_computation_state["state"] == "completed") or ( - (self.process_config.timeout_seconds is not None) - and ( - self.process_config.poll_interval_seconds * num_checks - >= self.process_config.timeout_seconds - ) - ): - executing = False - return run_capsule_response - - def capture_result( # noqa: C901 - self, process_response: requests.Response - ) -> requests.Response: - """Capture the result of the processing that just finished.""" - - computation_id = process_response.json()["id"] - - create_data_asset_request = self.capture_config.request - if create_data_asset_request is None: - create_data_asset_request = CreateDataAssetRequest( - name=None, - mount=None, - tags=[], - custom_metadata={}, - ) - - input_data_asset_name = self.capture_config.input_data_asset_name - output_bucket = self.capture_config.output_bucket - - if create_data_asset_request.name is None: - if self.register_config is not None: - asset_name = build_processed_data_asset_name( - self.register_config.name, - self.capture_config.process_name, - ) - # add input tags and custom metadata to result asset - create_data_asset_request.tags.extend( - self.register_config.tags - ) - create_data_asset_request.custom_metadata.update( - self.register_config.custom_metadata - ) - elif ( - self.process_config is not None - and self.process_config.request.data_assets is not None - ): - data_asset_ids = self.process_config.request.data_assets - # for single input data asset, use input data asset name - assert isinstance( - data_asset_ids, list - ), "data_assets must be a list" - # make sure data_assets is a list of ComputationDataAsset - if isinstance(data_asset_ids[0], dict): - data_asset_ids = [ - ComputationDataAsset(**asset) - for asset in data_asset_ids - ] - if len(data_asset_ids) > 1 and input_data_asset_name is None: - raise AssertionError( - "Data asset name not provided and " - "multiple data assets were provided in " - "the process configuration" - ) - # for multiple input data assets, - # propagate all tags and custom metadata - existing_tags = [] - existing_custom_metadata = {} - for data_asset_id in data_asset_ids: - response = self.api_handler.co_client.get_data_asset( - data_asset_id.id - ) - response_json = response.json() - existing_tags.extend(response_json.get("tags", [])) - existing_custom_metadata.update( - response_json.get("custom_metadata", {}) - ) - if len(data_asset_ids) == 1: - response_json = response.json() - input_data_asset_name = ( - input_data_asset_name or response_json["name"] - ) - asset_name = build_processed_data_asset_name( - input_data_asset_name, - self.capture_config.process_name, - ) - # add input tags and custom metadata to result asset - create_data_asset_request.tags.extend(existing_tags) - create_data_asset_request.custom_metadata.update( - existing_custom_metadata - ) - else: - assert ( - input_data_asset_name is not None - ), "Data asset name not provided" - - create_data_asset_request.name = asset_name - - if create_data_asset_request.mount is None: - create_data_asset_request.mount = create_data_asset_request.name - - create_data_asset_request.source = Source( - computation=Sources.Computation(id=computation_id) - ) - - if output_bucket is not None: - prefix = create_data_asset_request.name - create_data_asset_request.target = Target( - aws=Targets.AWS(bucket=output_bucket, prefix=prefix) - ) - - if self.add_data_level_metadata: - tags, custom_metadata = add_data_level_metadata( - DataLevel.DERIVED, - create_data_asset_request.tags, - create_data_asset_request.custom_metadata, - ) - create_data_asset_request.tags = tags - create_data_asset_request.custom_metadata = custom_metadata - - capture_result_response = ( - self.api_handler.create_data_asset_and_update_permissions( - request=create_data_asset_request, - assets_viewable_to_everyone=self.assets_viewable_to_everyone, - ) - ) - - return capture_result_response diff --git a/tests/__init__.py b/tests/__init__.py index ee61766..816e430 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,7 +1 @@ """Testing library""" - -from re import match - -from pydantic import __version__ - -PYD_VERSION = match(r"(\d+.\d+).\d+", __version__).group(1) diff --git a/tests/resources/co_responses.json b/tests/resources/co_responses.json index 318bc86..2b834f3 100644 --- a/tests/resources/co_responses.json +++ b/tests/resources/co_responses.json @@ -8,11 +8,13 @@ "id": "0faf14aa-13b9-450d-b26a-632935a4b763", "last_used": 0, "name": "ecephys_655019_2023-04-03_18-10-10", + "mount": "ecephys_655019_2023-04-03_18-10-10", "size": 3559699653, - "sourceBucket": { + "source_bucket": { "bucket": "aind-ephys-data-dev-u5u0i5", "origin": "aws", - "prefix": "ecephys_655019_2023-04-03_18-10-10" + "prefix": "ecephys_655019_2023-04-03_18-10-10", + "external": true }, "state": "ready", "tags": [ @@ -32,11 +34,13 @@ "id": "84586a1c-79cc-4240-b340-6049fe8469c2", "last_used": 0, "name": "ecephys_655019_2023-04-03_18-17-09", + "mount": "ecephys_655019_2023-04-03_18-17-09", "size": 3559464097, - "sourceBucket": { + "source_bucket": { "bucket": "aind-ephys-data-dev-u5u0i5", "origin": "aws", - "prefix": "ecephys_655019_2023-04-03_18-17-09" + "prefix": "ecephys_655019_2023-04-03_18-17-09", + "external": true }, "state": "ready", "tags": [ @@ -56,10 +60,12 @@ "id": "1936ae3a-73a8-422c-a7b1-1768732c6289", "last_used": 0, "name": "ecephys_661398_2023-03-31_17-01-09_nwb_2023-06-01_14-50-08", - "sourceBucket": { + "mount": "ecephys_661398_2023-03-31_17-01-09_nwb_2023-06-01_14-50-08", + "source_bucket": { "bucket": "", "origin": "local", - "prefix": "" + "prefix": "", + "external": false }, "state": "ready", "tags": [ @@ -77,10 +83,12 @@ "id": "2481baf2-e9e8-4416-9a0b-d2ffe5782071", "last_used": 0, "name": "ecephys_660166_2023-03-16_18-30-14_curated_2023-03-24_17-54-16", - "sourceBucket": { + "mount": "ecephys_660166_2023-03-16_18-30-14_curated_2023-03-24_17-54-16", + "source_bucket": { "bucket": "", "origin": "local", - "prefix": "" + "prefix": "", + "external": false }, "state": "ready", "tags": [ @@ -95,10 +103,12 @@ "id": "fcd8bc84-bd48-4af7-8826-da5ceb5cdd3a", "last_used": 0, "name": "ecephys_636766_2023-01-25_00-00-00", - "sourceBucket": { + "mount": "ecephys_636766_2023-01-25_00-00-00", + "source_bucket": { "bucket": "", "origin": "local", - "prefix": "" + "prefix": "", + "external": false }, "state": "ready", "tags": [ @@ -113,10 +123,12 @@ "id": "fc915970-5489-4b6d-af94-620b067cd2cd", "last_used": 0, "name": "ecephys_636766_2023-01-23_00-00-00_sorted-ks2.5_2023-06-01_14-48-42", - "sourceBucket": { + "mount": "ecephys_636766_2023-01-23_00-00-00_sorted-ks2.5_2023-06-01_14-48-42", + "source_bucket": { "bucket": "", "origin": "local", - "prefix": "" + "prefix": "", + "external": false }, "state": "ready", "tags": [ @@ -131,10 +143,12 @@ "id": "63f2d2de-4af8-4397-94ab-9484c8e8c847", "last_used": 0, "name": "ecephys_622155_2022-05-31_15-29-16_2023-06-01_14-45-05", - "sourceBucket": { + "mount": "ecephys_622155_2022-05-31_15-29-16_2023-06-01_14-45-05", + "source_bucket": { "bucket": "", "origin": "local", - "prefix": "" + "prefix": "", + "external": false }, "state": "ready", "tags": [ @@ -148,7 +162,8 @@ "files": 1540, "id": "fa312ea4-6068-4a4e-a40b-e8c73c9660d0", "last_used": 0, - "name": "multiplane-ophys_438912_2019-04-17_15-19-14_processed_2024-02-14_19-44-46", + "name": "multiplane-ophys_438912_2019-04-17_15-19-14_processed_2024-02-14_19-44-46", + "mount": "multiplane-ophys_438912_2019-04-17_15-19-14_processed_2024-02-14_19-44-46", "provenance": { "capsule": "36173b3e-2e7b-4510-ad51-8b7e90be08bc", "commit": "1ad2c544ceb4bc2da208f78c8786f5d107a29d6d", diff --git a/tests/test_alert_bot.py b/tests/test_alert_bot.py deleted file mode 100644 index 8ffd6d7..0000000 --- a/tests/test_alert_bot.py +++ /dev/null @@ -1,48 +0,0 @@ -"""Testing Alerts""" - -import unittest -from unittest import mock -from unittest.mock import MagicMock, call - -from aind_codeocean_utils.alert_bot import AlertBot - - -class TestAlertBot(unittest.TestCase): - """Tests methods in AlertBot class""" - - @mock.patch("requests.post") - def test_send_message(self, mocked_post: MagicMock) -> None: - """ - Tests that the message is being sent correctly - Parameters - ---------- - mocked_post : MagicMock - mock the request.post calls - mock_print : MagicMock - mock the print calls - - Returns - ------- - None - - """ - alert_bot = AlertBot("testing_url.com") - alert_bot.send_message(message="Test Message") - contents0 = alert_bot._create_body_text("Test Message", None) - mocked_post.assert_called_once_with("testing_url.com", json=contents0) - alert_bot.send_message( - message="Another Message", extra_text="With extra text" - ) - contents1 = alert_bot._create_body_text( - "Another Message", "With extra text" - ) - mocked_post.assert_has_calls( - [ - call("testing_url.com", json=contents0), - call("testing_url.com", json=contents1), - ] - ) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_api_handler.py b/tests/test_api_handler.py index 34ede19..7ade4d5 100644 --- a/tests/test_api_handler.py +++ b/tests/test_api_handler.py @@ -7,13 +7,14 @@ from pathlib import Path from unittest.mock import MagicMock, call, patch -from aind_codeocean_api.codeocean import CodeOceanClient -from requests import Response +from codeocean import CodeOcean +from codeocean.data_asset import DataAsset, DataAssetSearchParams from aind_codeocean_utils.api_handler import APIHandler TEST_DIRECTORY = Path(os.path.dirname(os.path.realpath(__file__))) -MOCK_RESPONSE_FILE = TEST_DIRECTORY / "resources" / "co_responses.json" +MOCK_RESPONSE_FILE = TEST_DIRECTORY / "resources" / "iterator_response.json" +MOCK_RESPONSE_FILE2 = TEST_DIRECTORY / "resources" / "co_responses.json" class TestAPIHandler(unittest.TestCase): @@ -24,252 +25,92 @@ def setUpClass(cls): """Load mock_db before running tests.""" co_mock_token = "abc-123" - co_mock_domain = "https://aind.codeocean.com" + co_mock_domain = "https://example.com" - with open(MOCK_RESPONSE_FILE) as f: - json_contents = json.load(f) - mock_search_all_data_assets_success_response = Response() - mock_search_all_data_assets_success_response.status_code = 200 - mock_search_all_data_assets_success_response._content = json.dumps( - json_contents["search_all_data_assets"] - ).encode("utf-8") - mock_co_client = CodeOceanClient( - domain=co_mock_domain, token=co_mock_token - ) + with open(MOCK_RESPONSE_FILE2) as f: + contents = json.load(f) + mock_co_client = CodeOcean(domain=co_mock_domain, token=co_mock_token) mock_s3_client = MagicMock() - cls.mock_search_all_data_assets_success_response = ( - mock_search_all_data_assets_success_response - ) + cls.mock_search_all_data_assets = [ + DataAsset.from_json(json.dumps(r)) + for r in contents["search_all_data_assets"]["results"] + ] cls.api_handler = APIHandler(co_client=mock_co_client) cls.api_handler_dry = APIHandler(co_client=mock_co_client, dryrun=True) cls.api_handler_s3 = APIHandler( co_client=mock_co_client, s3=mock_s3_client ) - @patch( - "aind_codeocean_api.codeocean.CodeOceanClient.search_all_data_assets" - ) - @patch("aind_codeocean_api.codeocean.CodeOceanClient.update_data_asset") - @patch("logging.debug") - @patch("logging.info") + @patch("codeocean.data_asset.DataAssets.search_data_assets_iterator") + @patch("codeocean.data_asset.DataAssets.update_metadata") def test_update_tags( self, - mock_log_info: MagicMock, - mock_log_debug: MagicMock, mock_update: MagicMock, - mock_get: MagicMock, + mock_search_data_assets_iterator: MagicMock, ): """Tests update tags changes tags correctly.""" - mock_get.return_value = ( - self.mock_search_all_data_assets_success_response - ) - mock_update_response = Response() - mock_update_response.status_code = 200 - mock_update_response._content = b'{"message": "success"}' - mock_update.return_value = mock_update_response - response = self.api_handler.co_client.search_all_data_assets() - data_assets = response.json()["results"] - self.api_handler.update_tags( - tags_to_remove=["test"], - tags_to_add=["new_tag"], - tags_to_replace={"ECEPHYS": "ecephys"}, - data_assets=data_assets, - ) - - expected_calls = [ - { - "data_asset_id": "0faf14aa-13b9-450d-b26a-632935a4b763", - "new_name": "ecephys_655019_2023-04-03_18-10-10", - "new_tags": {"raw", "ecephys", "655019", "new_tag"}, - }, - { - "data_asset_id": "84586a1c-79cc-4240-b340-6049fe8469c2", - "new_name": "ecephys_655019_2023-04-03_18-17-09", - "new_tags": {"ecephys", "655019", "new_tag", "raw"}, - }, - { - "data_asset_id": "1936ae3a-73a8-422c-a7b1-1768732c6289", - "new_name": ( - "ecephys_661398_2023-03-31_17-01-09" - "_nwb_2023-06-01_14-50-08" - ), - "new_tags": {"new_tag"}, - }, - { - "data_asset_id": "2481baf2-e9e8-4416-9a0b-d2ffe5782071", - "new_name": ( - "ecephys_660166_2023-03-16_18-30-14" - "_curated_2023-03-24_17-54-16" - ), - "new_tags": {"new_tag"}, - }, - { - "data_asset_id": "fcd8bc84-bd48-4af7-8826-da5ceb5cdd3a", - "new_name": "ecephys_636766_2023-01-25_00-00-00", - "new_tags": {"new_tag"}, - }, - { - "data_asset_id": "fc915970-5489-4b6d-af94-620b067cd2cd", - "new_name": ( - "ecephys_636766_2023-01-23_00-00-00" - "_sorted-ks2.5_2023-06-01_14-48-42" - ), - "new_tags": {"new_tag"}, - }, - { - "data_asset_id": "63f2d2de-4af8-4397-94ab-9484c8e8c847", - "new_name": ( - "ecephys_622155_2022-05-31_15-29-16_2023-06-01_14-45-05" - ), - "new_tags": {"new_tag"}, - }, - { - "data_asset_id": "fa312ea4-6068-4a4e-a40b-e8c73c9660d0", - "new_name": ( - "multiplane-ophys_438912_2019-04-17_15-19-14" - "_processed_2024-02-14_19-44-46" - ), - "new_tags": {"pipeline-v3.0", "multiplane-ophys", "new_tag"}, - }, - ] - actual_calls = [c.kwargs for c in mock_update.mock_calls] - for row in actual_calls: - row["new_tags"] = set(row["new_tags"]) - self.assertEqual(expected_calls, actual_calls) - expected_debug_calls = [ - call(f"Updating data asset: {data_asset}") - for data_asset in data_assets - ] - mock_log_debug.assert_has_calls(expected_debug_calls) - mock_log_info.assert_has_calls( - [call({"message": "success"}) for _ in data_assets] - ) - - @patch( - "aind_codeocean_api.codeocean.CodeOceanClient.search_all_data_assets" - ) - @patch("aind_codeocean_api.codeocean.CodeOceanClient.update_data_asset") - @patch("logging.debug") - @patch("logging.info") - def test_update_tags_with_nones( - self, - mock_log_info: MagicMock, - mock_log_debug: MagicMock, - mock_update: MagicMock, - mock_get: MagicMock, - ): - """Tests that NoneType inputs are handled correctly.""" - mock_get.return_value = ( - self.mock_search_all_data_assets_success_response - ) - mock_update_response = Response() - mock_update_response.status_code = 200 - mock_update_response._content = b'{"message": "success"}' - mock_update.return_value = mock_update_response - response = self.api_handler.co_client.search_all_data_assets() - data_assets = response.json()["results"] - self.api_handler.update_tags( - tags_to_add=["new_tag"], - tags_to_replace={"ECEPHYS": "ecephys"}, - data_assets=data_assets, - ) - self.api_handler.update_tags( - tags_to_remove=["test"], - tags_to_replace={"ECEPHYS": "ecephys"}, - data_assets=data_assets, - ) - self.api_handler.update_tags( - tags_to_remove=["test"], - tags_to_add=["new_tag"], - data_assets=data_assets, - ) - self.api_handler.update_tags( - tags_to_add=["new_tag"], - data_assets=data_assets, - ) - self.api_handler.update_tags( - tags_to_remove=["test"], - data_assets=data_assets, - ) - self.api_handler.update_tags( - tags_to_replace={"ECEPHYS": "ecephys"}, - data_assets=data_assets, - ) - self.api_handler.update_tags( - data_assets=data_assets, - ) - data_assets_with_no_tags = [ - { - "created": 1685645105, - "description": "", - "files": 10, - "id": "63f2d2de-4af8-4397-94ab-9484c8e8c847", - "last_used": 0, - "name": "test_data_with_empty_tags", - "sourceBucket": { - "bucket": "", - "origin": "local", - "prefix": "", - }, - "state": "ready", - "tags": [], - "type": "dataset", - }, - { - "created": 1685645105, - "description": "", - "files": 10, - "id": "63f2d2de-4af8-4397-94ab-9484c8e8c847", - "last_used": 0, - "name": "test_data_with_missing_field", - "sourceBucket": { - "bucket": "", - "origin": "local", - "prefix": "", - }, - "state": "ready", - "type": "dataset", - }, - ] - self.api_handler.update_tags( - tags_to_replace={"ECEPHYS": "ecephys"}, - data_assets=data_assets_with_no_tags, - ) - mock_log_info.assert_called() - mock_log_debug.assert_called() - - @patch( - "aind_codeocean_api.codeocean.CodeOceanClient.search_all_data_assets" - ) - @patch("aind_codeocean_api.codeocean.CodeOceanClient.update_data_asset") - @patch("logging.debug") - @patch("logging.info") + mock_search_data_assets_iterator.return_value = ( + self.mock_search_all_data_assets + ) + data_assets = ( + self.api_handler.co_client.data_assets.search_data_assets_iterator( + search_params=DataAssetSearchParams(limit=1000) + ) + ) + with self.assertLogs(level="DEBUG") as captured: + self.api_handler.update_tags( + tags_to_remove=["test"], + tags_to_add=["new_tag"], + tags_to_replace={"ECEPHYS": "ecephys"}, + data_assets=data_assets, + ) + self.assertEqual(16, len(captured.output)) + self.assertEqual( + {"ecephys", "655019", "raw", "new_tag"}, + mock_update.mock_calls[0].kwargs["update_params"].tags, + ) + self.assertEqual( + {"raw", "655019", "ecephys", "new_tag"}, + mock_update.mock_calls[2].kwargs["update_params"].tags, + ) + self.assertEqual( + {"new_tag"}, + mock_update.mock_calls[4].kwargs["update_params"].tags, + ) + self.assertEqual( + {"new_tag"}, + mock_update.mock_calls[6].kwargs["update_params"].tags, + ) + self.assertEqual( + {"new_tag"}, + mock_update.mock_calls[8].kwargs["update_params"].tags, + ) + + @patch("codeocean.data_asset.DataAssets.search_data_assets_iterator") + @patch("codeocean.data_asset.DataAssets.update_metadata") def test_update_tags_dryrun( self, - mock_log_info: MagicMock, - mock_log_debug: MagicMock, mock_update: MagicMock, - mock_get: MagicMock, + mock_search_data_assets_iterator: MagicMock, ): """Tests update tags changes tags correctly.""" - mock_get.return_value = ( - self.mock_search_all_data_assets_success_response + mock_search_data_assets_iterator.return_value = ( + self.mock_search_all_data_assets ) - response = self.api_handler_dry.co_client.search_all_data_assets() - data_assets = response.json()["results"] - self.api_handler_dry.update_tags( - tags_to_remove=["test"], - tags_to_add=["new_tag"], - data_assets=data_assets, + data_assets = ( + self.api_handler.co_client.data_assets.search_data_assets_iterator( + search_params=DataAssetSearchParams(limit=1000) + ) ) + with self.assertLogs(level="DEBUG") as captured: + self.api_handler_dry.update_tags( + tags_to_remove=["test"], + tags_to_add=["new_tag"], + data_assets=data_assets, + ) mock_update.assert_not_called() - expected_debug_calls = [ - call(f"Updating data asset: {data_asset}") - for data_asset in data_assets - ] - mock_log_debug.assert_has_calls(expected_debug_calls) - mock_log_info.assert_called() + self.assertEqual(16, len(captured.output)) def test_bucket_prefix_exists(self): """Tests bucket_prefix_exists evaluation from boto response.""" @@ -288,22 +129,16 @@ def test_bucket_prefix_exists(self): ) self.assertTrue(resp) - @patch( - "aind_codeocean_api.codeocean.CodeOceanClient.search_all_data_assets" - ) - @patch("logging.error") - @patch("logging.debug") + @patch("codeocean.data_asset.DataAssets.search_data_assets_iterator") def test_find_external_assets( self, - mock_debug: MagicMock, - mock_log_error: MagicMock, - mock_get: MagicMock, + mock_search_data_assets_iterator: MagicMock, ): - """Tests find_external_data_assets and - find_nonexistent_external_data_assets methods""" - mock_get.return_value = ( - self.mock_search_all_data_assets_success_response + """Tests find_external_data_assets method""" + mock_search_data_assets_iterator.return_value = ( + self.mock_search_all_data_assets ) + self.api_handler_s3.s3.list_objects.side_effect = [ {}, Exception("Error"), @@ -314,6 +149,26 @@ def test_find_external_assets( resp = list(self.api_handler_s3.find_external_data_assets()) self.assertEqual(2, len(resp)) + @patch("codeocean.data_asset.DataAssets.search_data_assets_iterator") + @patch("logging.error") + @patch("logging.debug") + def test_find_non_existent_external_assets( + self, + mock_debug: MagicMock, + mock_log_error: MagicMock, + mock_search_data_assets_iterator: MagicMock, + ): + """Tests find_nonexistent_external_data_assets method""" + mock_search_data_assets_iterator.return_value = ( + self.mock_search_all_data_assets + ) + self.api_handler_s3.s3.list_objects.side_effect = [ + {}, + Exception("Error"), + {"CommonPrefixes": 1}, + {"CommonPrefixes": 2}, + ] + resp = list( self.api_handler_s3.find_nonexistent_external_data_assets() ) @@ -325,22 +180,20 @@ def test_find_external_assets( ) mock_log_error.assert_called_once() - @patch( - "aind_codeocean_api.codeocean.CodeOceanClient.search_all_data_assets" - ) + @patch("codeocean.data_asset.DataAssets.search_data_assets_iterator") @patch("logging.debug") @patch("logging.info") def test_find_archived_data_assets_to_delete( self, mock_log_info: MagicMock, mock_log_debug: MagicMock, - mock_get: MagicMock, + mock_search_data_assets_iterator: MagicMock, ): """Tests find_archived_data_assets_to_delete method with successful responses from CodeOCean""" - mock_get.return_value = ( - self.mock_search_all_data_assets_success_response + mock_search_data_assets_iterator.return_value = ( + self.mock_search_all_data_assets ) keep_after = datetime.datetime(year=2023, month=9, day=1) diff --git a/tests/test_codeocean_job.py b/tests/test_codeocean_job.py deleted file mode 100644 index 91defba..0000000 --- a/tests/test_codeocean_job.py +++ /dev/null @@ -1,1468 +0,0 @@ -"""Tests for the codeocean_job module""" - -import unittest -from unittest.mock import MagicMock, call, patch - -import requests -from aind_codeocean_api.codeocean import CodeOceanClient -from aind_codeocean_api.models.computations_requests import ( - ComputationDataAsset, - RunCapsuleRequest, -) -from aind_codeocean_api.models.data_assets_requests import ( - CreateDataAssetRequest, - Source, - Target, - Sources, - Targets, -) - -from aind_codeocean_utils.codeocean_job import ( - CodeOceanJob, - CodeOceanJobConfig, - ProcessConfig, - CaptureConfig, - build_processed_data_asset_name, -) - - -class TestCodeOceanJob(unittest.TestCase): - """Tests for CodeOceanJob class""" - - @classmethod - def setUpClass(cls): - """Set up basic configs that can be used across all tests.""" - basic_register_data_config = CreateDataAssetRequest( - name="platform_subject_date_time", - mount="deleteme", - source=Source( - aws=Sources.AWS( - bucket="asset_bucket", - prefix="asset_prefix", - keep_on_external_storage=True, - ) - ), - tags=sorted(["raw", "a", "b"]), - custom_metadata={ - "key1": "value1", - "key2": "value2", - "data level": "raw", - }, - ) - basic_process_config = ProcessConfig( - request=RunCapsuleRequest( - capsule_id="123-abc", - pipeline_id=None, - data_assets=[ - dict(id="999888", mount="some_mount"), - dict(id="12345", mount="some_mount_2"), - ], - parameters=["param1", "param2"], - ) - ) - basic_process_config_no_assets = ProcessConfig( - request=RunCapsuleRequest( - capsule_id="123-abc", - pipeline_id=None, - data_assets=None, - parameters=["param1", "param2"], - ), - input_data_asset_mount="custom-mount", - ) - basic_process_data_input_mount_config = ProcessConfig( - request=RunCapsuleRequest( - capsule_id="123-abc", - pipeline_id=None, - data_assets=[ - ComputationDataAsset(id="999888", mount="some_mount"), - ComputationDataAsset(id="12345", mount="some_mount_2"), - ], - parameters=["param1", "param2"], - version=3, - ), - input_data_asset_mount="custom-mount", - poll_interval_seconds=400, - timeout_seconds=10000, - ) - basic_process_data_one_asset_config = ProcessConfig( - request=RunCapsuleRequest( - capsule_id=None, - pipeline_id="123-abc", - data_assets=[ - dict(id="12345", mount="some_mount_2"), - ], - parameters=["param1", "param2"], - version=3, - ), - poll_interval_seconds=400, - timeout_seconds=10000, - ) - basic_capture_config = CaptureConfig( - process_name="some_process", - output_bucket="some_output_bucket", - request=CreateDataAssetRequest( - mount="some_mount", - name="some_asset_name", - tags=["x", "y"], - custom_metadata={ - "key1": "value1", - "key2": "value2", - }, - ), - ) - basic_capture_config_no_request = CaptureConfig( - process_name="some_process", - request=None, - ) - none_vals_capture_config = CaptureConfig( - process_name="some_process", - request=CreateDataAssetRequest( - mount=None, - name=None, - tags=["x", "y", "a", "b", "raw"], - custom_metadata={ - "key1": "value1", - "key2": "value2", - "data level": "raw", - }, - ), - ) - none_vals_capture_config_w_asset_name = CaptureConfig( - process_name="some_process", - request=CreateDataAssetRequest( - mount=None, - name="some_asset_name", - tags=["x", "y", "a", "b", "raw"], - custom_metadata={ - "key1": "value1", - "key2": "value2", - "data level": "raw", - }, - ), - ) - - co_domain = "http://codeocean.acme.org" - co_token = "co_api_token_1234" - cls.co_client = CodeOceanClient(domain=co_domain, token=co_token) - cls.basic_codeocean_job_config = CodeOceanJobConfig( - register_config=basic_register_data_config, - process_config=basic_process_config, - capture_config=basic_capture_config, - ) - cls.basic_codeocean_job_config_no_assets = CodeOceanJobConfig( - register_config=basic_register_data_config, - process_config=basic_process_config_no_assets, - capture_config=basic_capture_config, - ) - cls.basic_input_mount_codeocean_job_config = CodeOceanJobConfig( - register_config=basic_register_data_config, - process_config=basic_process_data_input_mount_config, - ) - cls.no_capture_request_job_config = CodeOceanJobConfig( - register_config=basic_register_data_config, - process_config=basic_process_config_no_assets, - capture_config=basic_capture_config_no_request, - ) - cls.no_reg_codeocean_job_config_no_asset_name = CodeOceanJobConfig( - register_config=None, - process_config=basic_process_config, - capture_config=none_vals_capture_config, - ) - cls.no_reg_codeocean_job_config = CodeOceanJobConfig( - register_config=None, - process_config=basic_process_config, - capture_config=none_vals_capture_config_w_asset_name, - ) - cls.one_asset_codeocean_job_config = CodeOceanJobConfig( - register_config=None, - process_config=basic_process_data_one_asset_config, - capture_config=basic_capture_config_no_request, - ) - cls.multi_asset_codeocean_job_config = CodeOceanJobConfig( - register_config=None, - process_config=basic_process_config, - capture_config=none_vals_capture_config, - ) - cls.no_process_codeocean_job_config = CodeOceanJobConfig( - register_config=None, - process_config=None, - capture_config=none_vals_capture_config, - ) - - @patch("time.sleep", return_value=None) - @patch("aind_codeocean_api.codeocean.CodeOceanClient.get_data_asset") - def test_wait_for_data_availability_success( - self, mock_get_data_asset: MagicMock, mock_sleep: MagicMock - ): - """Tests _wait_for_data_availability""" - some_response = requests.Response() - some_response.status_code = 200 - fake_data_asset_id = "abc-123" - some_response.json = { - "created": 1666322134, - "description": "", - "files": 1364, - "id": fake_data_asset_id, - "last_used": 0, - "name": "ecephys_632269_2022-10-10_16-13-22", - "size": 3632927966, - "state": "ready", - "tags": ["ecephys", "raw"], - "type": "dataset", - } - mock_get_data_asset.return_value = some_response - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.basic_codeocean_job_config, - ) - response = codeocean_job.api_handler.wait_for_data_availability( - data_asset_id=fake_data_asset_id - ) - self.assertEqual(200, response.status_code) - self.assertEqual(some_response.json, response.json) - mock_sleep.assert_called_once_with(10) - - @patch("time.sleep", return_value=None) - @patch("aind_codeocean_api.codeocean.CodeOceanClient.get_data_asset") - def test_wait_for_data_availability_timeout( - self, mock_get_data_asset: MagicMock, mock_sleep: MagicMock - ): - """Tests _wait_for_data_availability with timeout""" - some_response = requests.Response() - some_response.status_code = 500 - some_response.json = {"Something went wrong!"} - mock_get_data_asset.return_value = some_response - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.basic_codeocean_job_config, - ) - response = codeocean_job.api_handler.wait_for_data_availability( - data_asset_id="123" - ) - self.assertEqual(500, response.status_code) - self.assertEqual(some_response.json, response.json) - self.assertEqual(32, mock_sleep.call_count) - - @patch("time.sleep", return_value=None) - @patch("aind_codeocean_api.codeocean.CodeOceanClient.get_data_asset") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.get_computation") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.run_capsule") - def test_process_data_check_not_found( - self, - mock_run_capsule: MagicMock, - mock_get_computation: MagicMock, - mock_get_data_asset: MagicMock, - mock_sleep: MagicMock, - ): - """Tests _process_data with data asset not found response""" - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.basic_codeocean_job_config, - ) - some_response = requests.Response() - some_response.status_code = 404 - some_response.json = {"message: Not Found"} - mock_get_data_asset.return_value = some_response - - codeocean_job.process_config.request.data_assets = [ - ComputationDataAsset(id="999888", mount="some_mount") - ] - - with self.assertRaises(FileNotFoundError) as e: - codeocean_job.process_data() - - self.assertEqual( - "FileNotFoundError('Unable to find: 999888')", repr(e.exception) - ) - mock_run_capsule.assert_not_called() - mock_get_computation.assert_not_called() - mock_sleep.assert_not_called() - - @patch("time.sleep", return_value=None) - @patch("aind_codeocean_api.codeocean.CodeOceanClient.get_data_asset") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.get_computation") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.run_capsule") - def test_process_data_check_server_failed( - self, - mock_run_capsule: MagicMock, - mock_get_computation: MagicMock, - mock_get_data_asset: MagicMock, - mock_sleep: MagicMock, - ): - """Tests _process_data with a server error response""" - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.basic_codeocean_job_config, - ) - some_response = requests.Response() - some_response.status_code = 500 - some_response.json = {"Something went wrong"} - mock_get_data_asset.return_value = some_response - with self.assertRaises(ConnectionError) as e: - codeocean_job.process_data() - - self.assertEqual( - "ConnectionError('There was an issue retrieving: 999888')", - repr(e.exception), - ) - mock_run_capsule.assert_not_called() - mock_get_computation.assert_not_called() - mock_sleep.assert_not_called() - - @patch("time.sleep", return_value=None) - @patch("aind_codeocean_api.codeocean.CodeOceanClient.get_data_asset") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.get_computation") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.run_capsule") - def test_process_data_check_passed( - self, - mock_run_capsule: MagicMock, - mock_get_computation: MagicMock, - mock_get_data_asset: MagicMock, - mock_sleep: MagicMock, - ): - """Tests _process_data with successful responses from code ocean""" - some_get_data_asset_response = requests.Response() - some_get_data_asset_response.status_code = 200 - some_get_data_asset_response.json = lambda: ( - { - "created": 1666322134, - "description": "", - "files": 1364, - "id": "999888", - "last_used": 0, - "name": "ecephys_632269_2022-10-10_16-13-22", - "size": 3632927966, - "state": "ready", - "tags": ["ecephys", "raw"], - "type": "dataset", - } - ) - - mock_get_data_asset.return_value = some_get_data_asset_response - - some_run_response = requests.Response() - some_run_response.status_code = 200 - fake_computation_id = "comp-abc-123" - some_run_response.json = lambda: ( - { - "created": 1646943238, - "has_results": False, - "id": fake_computation_id, - "name": "Run 6943238", - "run_time": 1, - "state": "initializing", - } - ) - mock_run_capsule.return_value = some_run_response - - some_comp_response = requests.Response() - some_comp_response.status_code = 200 - some_comp_response.json = lambda: ( - { - "created": 1668125314, - "end_status": "succeeded", - "has_results": False, - "id": fake_computation_id, - "name": "Run With Parameters 8125314", - "parameters": [ - {"name": "", "value": '{"p_1": {"p1_1": "some_path"}}'} - ], - "run_time": 8, - "state": "completed", - } - ) - mock_get_computation.return_value = some_comp_response - - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.basic_codeocean_job_config, - ) - - response = codeocean_job.process_data( - register_data_response=some_get_data_asset_response - ) - mock_sleep.assert_called_once_with(300) - self.assertEqual(200, response.status_code) - self.assertEqual( - { - "created": 1646943238, - "has_results": False, - "id": "comp-abc-123", - "name": "Run 6943238", - "run_time": 1, - "state": "initializing", - }, - response.json(), - ) - codeocean_job_no_assets = CodeOceanJob( - co_client=self.co_client, - job_config=self.basic_codeocean_job_config_no_assets, - ) - - response = codeocean_job_no_assets.process_data( - register_data_response=some_get_data_asset_response - ) - - # test failed response ID - some_run_response = requests.Response() - some_run_response.status_code = 200 - some_run_response.json = lambda: ( - { - "created": 1668125314, - "end_status": "succeeded", - "has_results": False, - "id": None, - "name": "Run With Parameters 8125314", - "parameters": [ - {"name": "", "value": '{"p_1": {"p1_1": "some_path"}}'} - ], - "run_time": 8, - "state": "completed", - } - ) - mock_run_capsule.return_value = some_run_response - - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.basic_codeocean_job_config, - ) - with self.assertRaises(KeyError) as e: - codeocean_job.process_data( - register_data_response=some_get_data_asset_response - ) - - assert "Something went wrong running the capsule or pipeline." in repr( - e.exception - ) - - @patch("time.sleep", return_value=None) - @patch("aind_codeocean_api.codeocean.CodeOceanClient.create_data_asset") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.update_permissions") - @patch( - "aind_codeocean_utils.api_handler." - "APIHandler.wait_for_data_availability" - ) - def test_create_data_asset_and_update_permissions( - self, - mock_wait_for_data_availability: MagicMock, - mock_update_permissions: MagicMock, - mock_create_data_asset: MagicMock, - mock_sleep: MagicMock, - ): - """Tests _create_data_asset_and_update_permissions""" - fake_data_asset_id = "abc-123" - - some_create_data_asset_response = requests.Response() - some_create_data_asset_response.status_code = 200 - some_create_data_asset_response.json = lambda: ( - { - "created": 1641420832, - "description": "", - "files": 0, - "id": fake_data_asset_id, - "lastUsed": 0, - "name": "ecephys_632269_2022-10-10_16-13-22", - "sizeInBytes": 0, - "state": "DATA_ASSET_STATE_DRAFT", - "tags": ["ecephys", "raw"], - "type": "DATA_ASSET_TYPE_DATASET", - } - ) - mock_create_data_asset.return_value = some_create_data_asset_response - - some_wait_for_data_response = requests.Response() - some_wait_for_data_response.status_code = 200 - some_wait_for_data_response.json = lambda: ( - { - "created": 1666322134, - "description": "", - "files": 1364, - "id": fake_data_asset_id, - "last_used": 0, - "name": "ecephys_632269_2022-10-10_16-13-22", - "size": 3632927966, - "state": "ready", - "tags": ["ecephys", "raw"], - "type": "dataset", - } - ) - mock_wait_for_data_availability.return_value = ( - some_wait_for_data_response - ) - - some_update_permissions_response = requests.Response() - some_update_permissions_response.status_code = 204 - mock_update_permissions.return_value = some_update_permissions_response - - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.basic_codeocean_job_config, - ) - actual_response = ( - codeocean_job.api_handler.create_data_asset_and_update_permissions( - request=codeocean_job.register_config, - assets_viewable_to_everyone=( - codeocean_job.assets_viewable_to_everyone - ), - ) - ) - self.assertEqual(some_create_data_asset_response, actual_response) - mock_sleep.assert_not_called() - - @patch("time.sleep", return_value=None) - @patch("aind_codeocean_api.codeocean.CodeOceanClient.create_data_asset") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.update_permissions") - @patch( - "aind_codeocean_utils.api_handler.APIHandler." - "wait_for_data_availability" - ) - def test_create_data_asset_and_update_permissions_failure( - self, - mock_wait_for_data_availability: MagicMock, - mock_update_permissions: MagicMock, - mock_create_data_asset: MagicMock, - mock_sleep: MagicMock, - ): - """ - Tests _create_data_asset_and_update_permissions with - a fail response - """ - fake_data_asset_id = "abc-123" - - some_create_data_asset_response = requests.Response() - some_create_data_asset_response.status_code = 200 - some_create_data_asset_response.json = lambda: ( - { - "created": 1641420832, - "description": "", - "files": 0, - "id": fake_data_asset_id, - "lastUsed": 0, - "name": "ecephys_632269_2022-10-10_16-13-22", - "sizeInBytes": 0, - "state": "DATA_ASSET_STATE_DRAFT", - "tags": ["ecephys", "raw"], - "type": "DATA_ASSET_TYPE_DATASET", - } - ) - mock_create_data_asset.return_value = some_create_data_asset_response - - some_wait_for_data_response = requests.Response() - some_wait_for_data_response.status_code = 500 - some_wait_for_data_response.json = {"Something went wrong!"} - mock_wait_for_data_availability.return_value = ( - some_wait_for_data_response - ) - - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.basic_codeocean_job_config, - ) - with self.assertRaises(FileNotFoundError) as e: - codeocean_job.api_handler.create_data_asset_and_update_permissions( - request=codeocean_job.register_config, - assets_viewable_to_everyone=( - codeocean_job.assets_viewable_to_everyone - ), - ) - self.assertEqual( - "FileNotFoundError('Unable to find: abc-123')", repr(e.exception) - ) - mock_update_permissions.assert_not_called() - mock_sleep.assert_not_called() - - @patch("time.sleep", return_value=None) - @patch("aind_codeocean_api.codeocean.CodeOceanClient.create_data_asset") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.update_permissions") - @patch( - "aind_codeocean_utils.api_handler.APIHandler." - "wait_for_data_availability" - ) - def test_capture_result( - self, - mock_wait_for_data_availability: MagicMock, - mock_update_permissions: MagicMock, - mock_create_data_asset: MagicMock, - mock_sleep: MagicMock, - ): - """Tests capture_results""" - fake_data_asset_id = "abc-123" - - some_create_data_asset_response = requests.Response() - some_create_data_asset_response.status_code = 200 - some_create_data_asset_response.json = lambda: ( - { - "created": 1641420832, - "description": "", - "files": 0, - "id": fake_data_asset_id, - "lastUsed": 0, - "name": "ecephys_632269_2022-10-10_16-13-22", - "sizeInBytes": 0, - "state": "DATA_ASSET_STATE_DRAFT", - "tags": ["ecephys", "raw"], - "type": "DATA_ASSET_TYPE_DATASET", - } - ) - mock_create_data_asset.return_value = some_create_data_asset_response - - some_wait_for_data_response = requests.Response() - some_wait_for_data_response.status_code = 200 - some_wait_for_data_response.json = lambda: ( - { - "created": 1666322134, - "description": "", - "files": 1364, - "id": fake_data_asset_id, - "last_used": 0, - "name": "ecephys_632269_2022-10-10_16-13-22", - "size": 3632927966, - "state": "ready", - "tags": ["ecephys", "raw"], - "type": "dataset", - } - ) - mock_wait_for_data_availability.return_value = ( - some_wait_for_data_response - ) - - some_update_permissions_response = requests.Response() - some_update_permissions_response.status_code = 204 - mock_update_permissions.return_value = some_update_permissions_response - - some_process_response = requests.Response() - some_process_response.status_code = 200 - some_process_response.json = lambda: ( - { - "created": 1668125314, - "end_status": "succeeded", - "has_results": False, - "id": "fake_id", - "name": "Run With Parameters 8125314", - "parameters": [ - {"name": "", "value": '{"p_1": {"p1_1": "some_path"}}'} - ], - "run_time": 8, - "state": "completed", - } - ) - - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.basic_codeocean_job_config, - ) - actual_response = codeocean_job.capture_result( - process_response=some_process_response - ) - self.assertEqual(some_create_data_asset_response, actual_response) - mock_sleep.assert_not_called() - - # test no capture request - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.no_capture_request_job_config, - ) - actual_response = codeocean_job.capture_result( - process_response=some_process_response - ) - self.assertEqual(some_create_data_asset_response, actual_response) - mock_sleep.assert_not_called() - - @patch("time.sleep", return_value=None) - @patch("aind_codeocean_api.codeocean.CodeOceanClient.create_data_asset") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.get_data_asset") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.update_permissions") - @patch( - "aind_codeocean_utils.api_handler.APIHandler." - "wait_for_data_availability" - ) - def test_capture_result_additional_tags_and_metadata( - self, - mock_wait_for_data_availability: MagicMock, - mock_update_permissions: MagicMock, - mock_get_data_asset: MagicMock, - mock_create_data_asset: MagicMock, - mock_sleep: MagicMock, - ): - """Tests capture_results with additional tags and metadata""" - fake_data_asset_id = "abc-123" - # - some_create_data_asset_response = requests.Response() - some_create_data_asset_response.status_code = 200 - some_create_data_asset_response.json = lambda: ( - { - "created": 1641420832, - "description": "", - "files": 0, - "id": fake_data_asset_id, - "lastUsed": 0, - "name": "ecephys_632269_2022-10-10_16-13-22", - "sizeInBytes": 0, - "state": "DATA_ASSET_STATE_DRAFT", - "tags": ["ecephys", "raw"], - "type": "DATA_ASSET_TYPE_DATASET", - } - ) - mock_create_data_asset.return_value = some_create_data_asset_response - - some_wait_for_data_response = requests.Response() - some_wait_for_data_response.status_code = 200 - some_wait_for_data_response.json = lambda: ( - { - "created": 1666322134, - "description": "", - "files": 1364, - "id": fake_data_asset_id, - "last_used": 0, - "name": "ecephys_632269_2022-10-10_16-13-22", - "size": 3632927966, - "state": "ready", - "tags": ["ecephys", "raw"], - "type": "dataset", - } - ) - mock_wait_for_data_availability.return_value = ( - some_wait_for_data_response - ) - - some_update_permissions_response = requests.Response() - some_update_permissions_response.status_code = 204 - mock_update_permissions.return_value = some_update_permissions_response - - some_process_response = requests.Response() - some_process_response.status_code = 200 - some_process_response.json = lambda: ( - { - "created": 1668125314, - "end_status": "succeeded", - "has_results": False, - "id": "124fq", - "name": "Run With Parameters 8125314", - "parameters": [ - {"name": "", "value": '{"p_1": {"p1_1": "some_path"}}'} - ], - "run_time": 8, - "state": "completed", - } - ) - - # check that duplicated tags and metadata are not added - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.basic_codeocean_job_config, - ) - capture_tags = codeocean_job.capture_config.request.tags.copy() - capture_metadata = codeocean_job.capture_config.request.custom_metadata - - capture_metadata_input = capture_metadata.copy() - capture_metadata_input.update({"data level": "raw"}) - capture_metadata_output = capture_metadata.copy() - capture_metadata_output.update({"data level": "derived"}) - - codeocean_job.capture_result(process_response=some_process_response) - mock_create_data_asset.assert_has_calls( - [ - call( - CreateDataAssetRequest( - name="some_asset_name", - tags=sorted(list(set(capture_tags + ["derived"]))), - mount="some_mount", - description=None, - source=Source( - aws=None, - gcp=None, - computation=Sources.Computation( - id="124fq", path=None - ), - ), - target=Target( - aws=Targets.AWS( - bucket="some_output_bucket", - prefix="some_asset_name", - ) - ), - custom_metadata=capture_metadata_output, - ) - ) - ] - ) - mock_sleep.assert_not_called() - - some_get_data_asset_response = requests.Response() - some_get_data_asset_response.status_code = 200 - some_get_data_asset_response.json = lambda: ( - { - "name": "some_custom_asset_name", - "tags": ["my-custom-input-tag"], - "custom_metadata": {"data level": "raw", "key1": "value1"}, - } - ) - mock_get_data_asset.return_value = some_get_data_asset_response - - # test inheriting tags and metadata from input data asset - mock_create_data_asset.reset_mock() - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.one_asset_codeocean_job_config, - ) - codeocean_job.capture_result(process_response=some_process_response) - - captured_asset_name = build_processed_data_asset_name( - "some_custom_asset_name", codeocean_job.capture_config.process_name - ) - mock_create_data_asset.assert_has_calls( - [ - call( - CreateDataAssetRequest( - name=captured_asset_name, - tags=["derived", "my-custom-input-tag"], - mount=captured_asset_name, - description=None, - source=Source( - aws=None, - gcp=None, - computation=Sources.Computation( - id="124fq", path=None - ), - ), - target=None, - custom_metadata={ - "data level": "derived", - "key1": "value1", - }, - ) - ) - ] - ) - - @patch("time.sleep", return_value=None) - @patch("aind_codeocean_api.codeocean.CodeOceanClient.create_data_asset") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.get_data_asset") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.update_permissions") - @patch( - "aind_codeocean_utils.api_handler.APIHandler." - "wait_for_data_availability" - ) - def test_capture_result_none_vals( - self, - mock_wait_for_data_availability: MagicMock, - mock_update_permissions: MagicMock, - mock_get_data_asset: MagicMock, - mock_create_data_asset: MagicMock, - mock_sleep: MagicMock, - ): - """Tests capture_results with asset_name and mount set to None""" - fake_data_asset_id = "abc-123" - - some_create_data_asset_response = requests.Response() - some_create_data_asset_response.status_code = 200 - some_create_data_asset_response.json = lambda: ( - { - "created": 1641420832, - "description": "", - "files": 0, - "id": fake_data_asset_id, - "lastUsed": 0, - "name": "ecephys_632269_2022-10-10_16-13-22", - "sizeInBytes": 0, - "state": "DATA_ASSET_STATE_DRAFT", - "tags": ["ecephys", "raw"], - "type": "DATA_ASSET_TYPE_DATASET", - } - ) - mock_create_data_asset.return_value = some_create_data_asset_response - - some_wait_for_data_response = requests.Response() - some_wait_for_data_response.status_code = 200 - some_wait_for_data_response.json = lambda: ( - { - "created": 1666322134, - "description": "", - "files": 1364, - "id": fake_data_asset_id, - "last_used": 0, - "name": "ecephys_632269_2022-10-10_16-13-22", - "size": 3632927966, - "state": "ready", - "tags": ["ecephys", "raw"], - "type": "dataset", - } - ) - mock_wait_for_data_availability.return_value = ( - some_wait_for_data_response - ) - - some_update_permissions_response = requests.Response() - some_update_permissions_response.status_code = 204 - mock_update_permissions.return_value = some_update_permissions_response - - some_process_response = requests.Response() - some_process_response.status_code = 200 - some_process_response.json = lambda: ( - { - "created": 1668125314, - "end_status": "succeeded", - "has_results": False, - "id": "124fq", - "name": "Run With Parameters 8125314", - "parameters": [ - {"name": "", "value": '{"p_1": {"p1_1": "some_path"}}'} - ], - "run_time": 8, - "state": "completed", - } - ) - - # Test getting asset name from attached data asset - some_get_data_asset_response = requests.Response() - some_get_data_asset_response.status_code = 200 - some_get_data_asset_response.json = lambda: ( - { - "name": "some_input_data_asset_name", - } - ) - mock_get_data_asset.return_value = some_get_data_asset_response - - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.one_asset_codeocean_job_config, - ) - - codeocean_job.capture_result(process_response=some_process_response) - - codeocean_job.capture_config.input_data_asset_name = ( - "some_input_data_asset_name" - ) - actual_response = codeocean_job.capture_result( - process_response=some_process_response - ) - self.assertEqual(some_create_data_asset_response, actual_response) - mock_sleep.assert_not_called() - - # Test exception when multiple input data assets is not provided - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.multi_asset_codeocean_job_config, - ) - with self.assertRaises(AssertionError) as e: - codeocean_job.capture_result( - process_response=some_process_response - ) - self.assertEqual( - ( - "AssertionError('Data asset name not provided and multiple " - "data assets were provided in the process configuration')" - ), - repr(e.exception), - ) - # Test exception when no input data assets is provided - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.no_process_codeocean_job_config, - ) - with self.assertRaises(AssertionError) as e: - codeocean_job.capture_result( - process_response=some_process_response - ) - self.assertEqual( - ("AssertionError('Data asset name not provided')"), - repr(e.exception), - ) - - @patch("time.sleep", return_value=None) - @patch("aind_codeocean_api.codeocean.CodeOceanClient.create_data_asset") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.update_permissions") - @patch( - "aind_codeocean_utils.api_handler.APIHandler." - "wait_for_data_availability" - ) - def test_capture_result_registration_failed( - self, - mock_wait_for_data_availability: MagicMock, - mock_update_permissions: MagicMock, - mock_create_data_asset: MagicMock, - mock_sleep: MagicMock, - ): - """Tests capture_results with failed registration step""" - some_create_data_asset_response = requests.Response() - some_create_data_asset_response.status_code = 500 - some_create_data_asset_response.json = lambda: ( - {"messsage": "Something went wrong!"} - ) - mock_create_data_asset.return_value = some_create_data_asset_response - - some_process_response = requests.Response() - some_process_response.status_code = 200 - some_process_response.json = lambda: ( - { - "created": 1668125314, - "end_status": "succeeded", - "has_results": False, - "id": "124fq", - "name": "Run With Parameters 8125314", - "parameters": [ - {"name": "", "value": '{"p_1": {"p1_1": "some_path"}}'} - ], - "run_time": 8, - "state": "completed", - } - ) - - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.basic_codeocean_job_config, - ) - with self.assertRaises(KeyError) as e: - codeocean_job.capture_result( - process_response=some_process_response - ) - - self.assertEqual( - ( - 'KeyError("Something went wrong registering ' - "'some_asset_name'. Response Status Code: 500. " - "Response Message: {'messsage': 'Something went wrong!'}\")" - ), - repr(e.exception), - ) - mock_sleep.assert_not_called() - mock_wait_for_data_availability.assert_not_called() - mock_update_permissions.assert_not_called() - - @patch("time.sleep", return_value=None) - @patch("aind_codeocean_api.codeocean.CodeOceanClient.create_data_asset") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.update_permissions") - @patch( - "aind_codeocean_utils.api_handler.APIHandler" - ".wait_for_data_availability" - ) - def test_capture_result_wait_for_data_failure( - self, - mock_wait_for_data_availability: MagicMock, - mock_update_permissions: MagicMock, - mock_create_data_asset: MagicMock, - mock_sleep: MagicMock, - ): - """Tests capture_results with wait_for_data failure""" - fake_data_asset_id = "abc-123" - - some_create_data_asset_response = requests.Response() - some_create_data_asset_response.status_code = 200 - some_create_data_asset_response.json = lambda: ( - { - "created": 1641420832, - "description": "", - "files": 0, - "id": fake_data_asset_id, - "lastUsed": 0, - "name": "ecephys_632269_2022-10-10_16-13-22", - "sizeInBytes": 0, - "state": "DATA_ASSET_STATE_DRAFT", - "tags": ["ecephys", "raw"], - "type": "DATA_ASSET_TYPE_DATASET", - } - ) - mock_create_data_asset.return_value = some_create_data_asset_response - - some_wait_for_data_response = requests.Response() - some_wait_for_data_response.status_code = 500 - some_wait_for_data_response.json = lambda: ( - {"message": "Something went wrong!"} - ) - mock_wait_for_data_availability.return_value = ( - some_wait_for_data_response - ) - - some_process_response = requests.Response() - some_process_response.status_code = 200 - some_process_response.json = lambda: ( - { - "created": 1668125314, - "end_status": "succeeded", - "has_results": False, - "id": "124fq", - "name": "Run With Parameters 8125314", - "parameters": [ - {"name": "", "value": '{"p_1": {"p1_1": "some_path"}}'} - ], - "run_time": 8, - "state": "completed", - } - ) - - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.basic_codeocean_job_config, - ) - with self.assertRaises(FileNotFoundError) as e: - codeocean_job.capture_result( - process_response=some_process_response - ) - - self.assertEqual( - "FileNotFoundError('Unable to find: abc-123')", repr(e.exception) - ) - mock_sleep.assert_not_called() - mock_update_permissions.assert_not_called() - - @patch("aind_codeocean_utils.codeocean_job.CodeOceanJob.capture_result") - @patch( - "aind_codeocean_utils.api_handler.APIHandler." - "create_data_asset_and_update_permissions" - ) - @patch("aind_codeocean_utils.codeocean_job.CodeOceanJob.process_data") - def test_run_job( - self, - mock_process_data: MagicMock, - mock_register_data: MagicMock, - mock_capture_result: MagicMock, - ): - """Tests run_job method""" - some_register_response = requests.Response() - some_register_response.status_code = 200 - fake_register_id = "12345" - custom_metadata = ( - self.basic_codeocean_job_config.register_config.custom_metadata - ) - some_register_response.json = lambda: ( - { - "created": 1666322134, - "description": "", - "files": 1364, - "id": fake_register_id, - "last_used": 0, - "name": "some_asset_name", - "state": "draft", - "custom_metadata": custom_metadata, - "tags": self.basic_codeocean_job_config.register_config.tags, - "type": "dataset", - } - ) - mock_register_data.return_value = some_register_response - - some_run_response = requests.Response() - some_run_response.status_code = 200 - fake_computation_id = "comp-abc-123" - some_run_response.json = lambda: ( - { - "created": 1646943238, - "has_results": False, - "id": fake_computation_id, - "name": "Run 6943238", - "run_time": 1, - "state": "initializing", - } - ) - mock_process_data.return_value = some_run_response - - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.basic_codeocean_job_config, - ) - codeocean_job.run_job() - request = self.basic_codeocean_job_config.register_config - request.tags = sorted(request.tags + ["platform", "subject"]) - request.custom_metadata.update( - {"experiment type": "platform", "subject id": "subject"} - ) - mock_register_data.assert_called_once_with( - request=request, - assets_viewable_to_everyone=( - codeocean_job.assets_viewable_to_everyone - ), - ) - - mock_process_data.assert_called_once_with( - register_data_response=some_register_response - ) - - # the process_data will propagate the additional_tags and - # additional_custom_metadata to the capture_results method - mock_capture_result.assert_called_once_with( - process_response=some_run_response - ) - - @patch( - "aind_codeocean_utils.api_handler.APIHandler." - "create_data_asset_and_update_permissions" - ) - @patch("aind_codeocean_utils.codeocean_job.CodeOceanJob.process_data") - def test_run_job_input_data( - self, - mock_process_data: MagicMock, - mock_register_data: MagicMock, - ): - """Tests run_job method""" - some_register_response = requests.Response() - some_register_response.status_code = 200 - fake_register_id = "12345" - custom_metadata = ( - self.basic_codeocean_job_config.register_config.custom_metadata - ) - some_register_response.json = lambda: ( - { - "created": 1666322134, - "description": "", - "files": 1364, - "id": fake_register_id, - "last_used": 0, - "name": "some_asset_name", - "state": "draft", - "custom_metadata": custom_metadata, - "tags": self.basic_codeocean_job_config.register_config.tags, - "type": "dataset", - } - ) - mock_register_data.return_value = some_register_response - - some_run_response = requests.Response() - some_run_response.status_code = 200 - fake_computation_id = "comp-abc-123" - some_run_response.json = lambda: ( - { - "created": 1646943238, - "has_results": False, - "id": fake_computation_id, - "name": "Run 6943238", - "run_time": 1, - "state": "initializing", - } - ) - mock_process_data.return_value = some_run_response - - self.basic_input_mount_codeocean_job_config.\ - add_subject_and_platform_metadata = ( - False - ) - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.basic_input_mount_codeocean_job_config, - ) - - codeocean_job.run_job() - mock_register_data.assert_called_once_with( - request=( - self.basic_input_mount_codeocean_job_config.register_config - ), - assets_viewable_to_everyone=( - codeocean_job.assets_viewable_to_everyone - ), - ) - - mock_process_data.assert_called_once_with( - register_data_response=some_register_response - ) - - @patch("aind_codeocean_utils.codeocean_job.CodeOceanJob.register_data") - @patch("aind_codeocean_utils.codeocean_job.CodeOceanJob.process_data") - @patch( - "aind_codeocean_utils.api_handler.APIHandler." - "create_data_asset_and_update_permissions" - ) - def test_run_job_no_registration( - self, - mock_create_data_asset: MagicMock, - mock_process_data: MagicMock, - mock_register_data: MagicMock, - ): - """Tests run_job method with Optional register_data set to None""" - some_run_response = requests.Response() - some_run_response.status_code = 200 - fake_computation_id = "comp-abc-123" - some_run_response.json = lambda: ( - { - "created": 1646943238, - "has_results": False, - "id": fake_computation_id, - "name": "Run 6943238", - "run_time": 1, - "state": "initializing", - } - ) - mock_process_data.return_value = some_run_response - - some_register_response = requests.Response() - some_register_response.status_code = 200 - fake_register_id = "12345" - custom_metadata = ( - self.basic_codeocean_job_config.register_config.custom_metadata - ) - some_register_response.json = lambda: ( - { - "created": 1666322134, - "description": "", - "files": 1364, - "id": fake_register_id, - "last_used": 0, - "name": "some_asset_name", - "state": "draft", - "custom_metadata": custom_metadata, - "tags": self.basic_codeocean_job_config.register_config.tags, - "type": "dataset", - } - ) - mock_register_data.return_value = some_register_response - - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.no_reg_codeocean_job_config, - ) - codeocean_job.run_job() - - mock_register_data.assert_not_called() - - with self.assertRaises(AssertionError) as e: - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.no_reg_codeocean_job_config_no_asset_name, - ) - codeocean_job.run_job() - - self.assertEqual( - ( - "AssertionError('Data asset name not provided and multiple " - "data assets were provided in the process configuration')" - ), - repr(e.exception), - ) - - @patch("aind_codeocean_utils.codeocean_job.CodeOceanJob.capture_result") - @patch( - "aind_codeocean_utils.api_handler.APIHandler." - "create_data_asset_and_update_permissions" - ) - @patch("aind_codeocean_utils.codeocean_job.CodeOceanJob.process_data") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.get_data_asset") - def test_run_job_one_data_asset( - self, - mock_get_data_asset: MagicMock, - mock_process_data: MagicMock, - mock_register_data: MagicMock, - mock_capture_result: MagicMock, - ): - """Tests run_job method with only one data asset attached""" - - some_get_data_response = requests.Response() - some_get_data_response.status_code = 200 - fake_data_asset_id = "12345" - some_get_data_response.json = lambda: ( - { - "created": 1666322134, - "description": "", - "files": 1364, - "id": fake_data_asset_id, - "last_used": 0, - "name": "ecephys_632269_2022-10-10_16-13-22", - "size": 3632927966, - "state": "ready", - "tags": ["ecephys", "raw"], - "type": "dataset", - } - ) - mock_get_data_asset.return_value = some_get_data_response - - some_run_response = requests.Response() - some_run_response.status_code = 200 - fake_computation_id = "comp-abc-123" - some_run_response.json = lambda: ( - { - "created": 1646943238, - "has_results": False, - "id": fake_computation_id, - "name": "Run 6943238", - "run_time": 1, - "state": "initializing", - } - ) - mock_process_data.return_value = some_run_response - - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.one_asset_codeocean_job_config, - ) - codeocean_job.run_job() - mock_register_data.assert_not_called() - mock_process_data.assert_called_once_with(register_data_response=None) - # the process_data will propagate the additional_tags and - # additional_custom_metadata to the capture_results method - mock_capture_result.assert_called_once_with( - process_response=some_run_response - ) - - @patch("aind_codeocean_utils.codeocean_job.CodeOceanJob.capture_result") - @patch( - "aind_codeocean_utils.api_handler.APIHandler." - "create_data_asset_and_update_permissions" - ) - @patch("aind_codeocean_utils.codeocean_job.CodeOceanJob.process_data") - @patch("aind_codeocean_api.codeocean.CodeOceanClient.get_data_asset") - def test_run_job_one_data_asset_none_capture_config( - self, - mock_get_data_asset: MagicMock, - mock_process_data: MagicMock, - mock_register_data: MagicMock, - mock_capture_result: MagicMock, - ): - """Tests run_job without data asset name in capture result config""" - - some_get_data_response = requests.Response() - some_get_data_response.status_code = 200 - fake_data_asset_id = "12345" - some_get_data_response.json = lambda: ( - { - "created": 1666322134, - "description": "", - "files": 1364, - "id": fake_data_asset_id, - "last_used": 0, - "name": "ecephys_632269_2022-10-10_16-13-22", - "size": 3632927966, - "state": "ready", - "tags": ["ecephys", "raw"], - "type": "dataset", - } - ) - mock_get_data_asset.return_value = some_get_data_response - - some_run_response = requests.Response() - some_run_response.status_code = 200 - fake_computation_id = "comp-abc-123" - some_run_response.json = lambda: ( - { - "created": 1646943238, - "has_results": False, - "id": fake_computation_id, - "name": "Run 6943238", - "run_time": 1, - "state": "initializing", - } - ) - mock_process_data.return_value = some_run_response - - codeocean_job = CodeOceanJob( - co_client=self.co_client, - job_config=self.one_asset_codeocean_job_config, - ) - codeocean_job.run_job() - mock_register_data.assert_not_called() - mock_process_data.assert_called_once_with(register_data_response=None) - # the process_data will propagate the additional_tags and - # additional_custom_metadata to the capture_results method - mock_capture_result.assert_called_once_with( - process_response=some_run_response - ) - - def test_build_processed_data_asset_name(self): - """Tests build_processed_data_asset_name function""" - input_data_asset_name = "ecephys_00000_2022-10-10_16-13-22" - process_name = "test-process" - processed_asset_name = build_processed_data_asset_name( - input_data_asset_name, process_name - ) - assert processed_asset_name.startswith( - f"{input_data_asset_name}_{process_name}_" - ) - - -if __name__ == "__main__": - unittest.main()