diff --git a/mmif/serialize/annotation.py b/mmif/serialize/annotation.py index 6527f482..672fa0bb 100644 --- a/mmif/serialize/annotation.py +++ b/mmif/serialize/annotation.py @@ -374,7 +374,7 @@ def add_property(self, name: str, With the former method, the SDK will record the added property as a `Annotation` annotation object, separate from the original `Document` - object. See :meth:`.Mmif.generate_capital_annotations()` for more. + object. See :meth:`mmif.serialize.Mmif.generate_capital_annotations()` for more. A few notes to keep in mind: diff --git a/mmif/utils/cli/describe.py b/mmif/utils/cli/describe.py index eaf35856..52166658 100644 --- a/mmif/utils/cli/describe.py +++ b/mmif/utils/cli/describe.py @@ -3,7 +3,7 @@ import sys import textwrap from pathlib import Path -from typing import Union +from typing import Union, cast from mmif.utils.workflow_helper import generate_workflow_identifier, describe_single_mmif, \ describe_mmif_collection @@ -22,7 +22,7 @@ def generate_pipeline_identifier(mmif_file: Union[str, Path]) -> str: import warnings warnings.warn("generate_pipeline_identifier is deprecated, use generate_workflow_identifier instead", DeprecationWarning) - return generate_workflow_identifier(mmif_file) + return cast(str, generate_workflow_identifier(mmif_file)) def describe_argparser(): diff --git a/mmif/utils/workflow_helper.py b/mmif/utils/workflow_helper.py index 7980eb89..74206db6 100644 --- a/mmif/utils/workflow_helper.py +++ b/mmif/utils/workflow_helper.py @@ -73,9 +73,31 @@ def generate_param_hash(params: dict) -> str: return hashlib.md5(param_string.encode('utf-8')).hexdigest() -def generate_workflow_identifier(mmif_file: Union[str, Path]) -> str: +def _read_mmif_from_path(mmif_input: Union[str, Path, Mmif]) -> Mmif: """ - Generate a workflow identifier string from a MMIF file. + Helper function to get a Mmif object from various input types. + + :param mmif_input: Either a file path (str or Path) or an existing Mmif object + :return: Mmif object + :raises ValueError: If input is not a valid type + """ + if isinstance(mmif_input, Mmif): + return mmif_input + elif isinstance(mmif_input, (str, Path)): + with open(mmif_input, "r") as f: + mmif_str = f.read() + return Mmif(mmif_str) + else: + raise ValueError( + "MMIF input must be a string path, a Path object, or a Mmif object." + ) + + +def generate_workflow_identifier(mmif_input: Union[str, Path, Mmif], + return_param_dicts=False) \ + -> Union[str, Tuple[str, List[dict]]]: + """ + Generate a workflow identifier string from a MMIF file or object. The identifier follows the storage directory structure format: app_name/version/param_hash/app_name2/version2/param_hash2/... @@ -83,25 +105,18 @@ def generate_workflow_identifier(mmif_file: Union[str, Path]) -> str: Uses view.metadata.parameters (raw user-passed values) for hashing to ensure reproducibility. Views with errors or warnings are excluded from the identifier; empty views are included. - """ - if not isinstance(mmif_file, (str, Path)): - raise ValueError( - "MMIF file path must be a string or a Path object." - ) - - with open(mmif_file, "r") as f: - mmif_str = f.read() - data = Mmif(mmif_str) + :param mmif_input: Path to MMIF file (str or Path) or a Mmif object + :param return_param_dicts: If True, also return the parameter dictionaries + :return: Workflow identifier string, or tuple of (identifier, param_dicts) if return_param_dicts=True + """ + data = _read_mmif_from_path(mmif_input) segments = [] - # First prefix is source information, sorted by document type - sources = Counter(doc.at_type.shortname for doc in data.documents) - segments.append('-'.join([f'{k}-{sources[k]}' for k in sorted(sources.keys())])) - # Group views into runs grouped_apps = group_views_by_app(data.views) + param_dicts = [] for app_execution in grouped_apps: # Use the first view in the run as representative for metadata first_view = app_execution[0] @@ -120,6 +135,7 @@ def generate_workflow_identifier(mmif_file: Union[str, Path]) -> str: param_dict = first_view.metadata.parameters except (KeyError, AttributeError): param_dict = {} + param_dicts.append(param_dict) param_hash = generate_param_hash(param_dict) @@ -128,6 +144,8 @@ def generate_workflow_identifier(mmif_file: Union[str, Path]) -> str: version_str = app_version if app_version else "unversioned" segments.append(f"{name_str}/{version_str}/{param_hash}") + if return_param_dicts: + return '/'.join(segments), param_dicts return '/'.join(segments) @@ -159,9 +177,9 @@ def _get_profile_data(view) -> dict: return {"runningTimeMS": milliseconds} -def describe_single_mmif(mmif_file: Union[str, Path]) -> dict: +def describe_single_mmif(mmif_input: Union[str, Path, Mmif]) -> dict: """ - Reads a MMIF file and extracts the workflow specification from it. + Reads a MMIF file or object and extracts the workflow specification from it. This function provides an app-centric summarization of the workflow. The conceptual hierarchy is that a **workflow** is a sequence of **apps**, @@ -212,19 +230,11 @@ def describe_single_mmif(mmif_file: Union[str, Path]) -> dict: The docstring above is used to generate help messages for the CLI command. Do not remove the triple-dashed lines. - :param mmif_file: Path to the MMIF file + :param mmif_input: Path to MMIF file (str or Path) or a Mmif object :return: A dictionary containing the workflow specification. """ - if not isinstance(mmif_file, (str, Path)): - raise ValueError( - "MMIF file path must be a string or a Path object." - ) - - workflow_id = generate_workflow_identifier(mmif_file) - with open(mmif_file, "r") as f: - mmif_str = f.read() - - mmif = Mmif(mmif_str) + mmif = _read_mmif_from_path(mmif_input) + workflow_id = generate_workflow_identifier(mmif) error_view_ids = [] warning_view_ids = [] diff --git a/tests/test_utils.py b/tests/test_utils.py index 0c261fe7..5f29b9d2 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -276,9 +276,72 @@ def test_generate_workflow_identifier_grouped(self): try: workflow_id = workflow_helper.generate_workflow_identifier(tmp_file) segments = workflow_id.split('/') - self.assertEqual(len(segments), 7) - self.assertIn('app1', segments[1]) - self.assertIn('app2', segments[4]) + self.assertEqual(len(segments), 6) + self.assertIn('app1', segments[0]) + self.assertIn('app2', segments[3]) + finally: + os.unlink(tmp_file) + + def test_generate_workflow_identifier_with_mmif_object(self): + """Test that generate_workflow_identifier accepts Mmif objects directly.""" + from mmif.utils import workflow_helper + import os + + # Test with Mmif object directly + workflow_id_from_obj = workflow_helper.generate_workflow_identifier(self.basic_mmif) + + # Test with file path - should produce the same result + tmp_file = self.create_temp_mmif_file(self.basic_mmif) + try: + workflow_id_from_file = workflow_helper.generate_workflow_identifier(tmp_file) + self.assertEqual(workflow_id_from_obj, workflow_id_from_file) + finally: + os.unlink(tmp_file) + + def test_read_mmif_from_path(self): + """Test the _read_mmif_from_path helper function.""" + from mmif.utils.workflow_helper import _read_mmif_from_path + from pathlib import Path + import os + + # Test with Mmif object - should return as-is + result = _read_mmif_from_path(self.basic_mmif) + self.assertIs(result, self.basic_mmif) + + # Test with file path string + tmp_file = self.create_temp_mmif_file(self.basic_mmif) + try: + result_from_str = _read_mmif_from_path(tmp_file) + self.assertIsInstance(result_from_str, Mmif) + self.assertEqual(result_from_str.serialize(pretty=False), self.basic_mmif.serialize(pretty=False)) + + # Test with Path object + result_from_path = _read_mmif_from_path(Path(tmp_file)) + self.assertIsInstance(result_from_path, Mmif) + self.assertEqual(result_from_path.serialize(pretty=False), self.basic_mmif.serialize(pretty=False)) + finally: + os.unlink(tmp_file) + + # Test with invalid input + with pytest.raises(ValueError): + _read_mmif_from_path(12345) + + def test_describe_single_mmif_with_mmif_object(self): + """Test that describe_single_mmif accepts Mmif objects directly.""" + from mmif.utils.workflow_helper import describe_single_mmif + import os + + # Test with Mmif object directly + result_from_obj = describe_single_mmif(self.basic_mmif) + + # Test with file path - should produce the same result + tmp_file = self.create_temp_mmif_file(self.basic_mmif) + try: + result_from_file = describe_single_mmif(tmp_file) + self.assertEqual(result_from_obj, result_from_file) + self.assertIn('workflowId', result_from_obj) + self.assertIn('stats', result_from_obj) + self.assertIn('apps', result_from_obj) finally: os.unlink(tmp_file)