Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mmif/serialize/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def add_property(self, name: str,

With the former method, the SDK will record the added property as a
`Annotation` annotation object, separate from the original `Document`
object. See :meth:`.Mmif.generate_capital_annotations()` for more.
object. See :meth:`mmif.serialize.Mmif.generate_capital_annotations()` for more.

A few notes to keep in mind:

Expand Down
4 changes: 2 additions & 2 deletions mmif/utils/cli/describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys
import textwrap
from pathlib import Path
from typing import Union
from typing import Union, cast

from mmif.utils.workflow_helper import generate_workflow_identifier, describe_single_mmif, \
describe_mmif_collection
Expand All @@ -22,7 +22,7 @@ def generate_pipeline_identifier(mmif_file: Union[str, Path]) -> str:
import warnings
warnings.warn("generate_pipeline_identifier is deprecated, use generate_workflow_identifier instead",
DeprecationWarning)
return generate_workflow_identifier(mmif_file)
return cast(str, generate_workflow_identifier(mmif_file))


def describe_argparser():
Expand Down
66 changes: 38 additions & 28 deletions mmif/utils/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,35 +73,50 @@ def generate_param_hash(params: dict) -> str:
return hashlib.md5(param_string.encode('utf-8')).hexdigest()


def generate_workflow_identifier(mmif_file: Union[str, Path]) -> str:
def _read_mmif_from_path(mmif_input: Union[str, Path, Mmif]) -> Mmif:
"""
Generate a workflow identifier string from a MMIF file.
Helper function to get a Mmif object from various input types.

:param mmif_input: Either a file path (str or Path) or an existing Mmif object
:return: Mmif object
:raises ValueError: If input is not a valid type
"""
if isinstance(mmif_input, Mmif):
return mmif_input
elif isinstance(mmif_input, (str, Path)):
with open(mmif_input, "r") as f:
mmif_str = f.read()
return Mmif(mmif_str)
else:
raise ValueError(
"MMIF input must be a string path, a Path object, or a Mmif object."
)


def generate_workflow_identifier(mmif_input: Union[str, Path, Mmif],
return_param_dicts=False) \
-> Union[str, Tuple[str, List[dict]]]:
"""
Generate a workflow identifier string from a MMIF file or object.

The identifier follows the storage directory structure format:
app_name/version/param_hash/app_name2/version2/param_hash2/...

Uses view.metadata.parameters (raw user-passed values) for hashing
to ensure reproducibility. Views with errors or warnings are excluded
from the identifier; empty views are included.
"""
if not isinstance(mmif_file, (str, Path)):
raise ValueError(
"MMIF file path must be a string or a Path object."
)

with open(mmif_file, "r") as f:
mmif_str = f.read()

data = Mmif(mmif_str)
:param mmif_input: Path to MMIF file (str or Path) or a Mmif object
:param return_param_dicts: If True, also return the parameter dictionaries
:return: Workflow identifier string, or tuple of (identifier, param_dicts) if return_param_dicts=True
"""
data = _read_mmif_from_path(mmif_input)
segments = []

# First prefix is source information, sorted by document type
sources = Counter(doc.at_type.shortname for doc in data.documents)
segments.append('-'.join([f'{k}-{sources[k]}' for k in sorted(sources.keys())]))

# Group views into runs
grouped_apps = group_views_by_app(data.views)

param_dicts = []
for app_execution in grouped_apps:
# Use the first view in the run as representative for metadata
first_view = app_execution[0]
Expand All @@ -120,6 +135,7 @@ def generate_workflow_identifier(mmif_file: Union[str, Path]) -> str:
param_dict = first_view.metadata.parameters
except (KeyError, AttributeError):
param_dict = {}
param_dicts.append(param_dict)

param_hash = generate_param_hash(param_dict)

Expand All @@ -128,6 +144,8 @@ def generate_workflow_identifier(mmif_file: Union[str, Path]) -> str:
version_str = app_version if app_version else "unversioned"
segments.append(f"{name_str}/{version_str}/{param_hash}")

if return_param_dicts:
return '/'.join(segments), param_dicts
return '/'.join(segments)


Expand Down Expand Up @@ -159,9 +177,9 @@ def _get_profile_data(view) -> dict:
return {"runningTimeMS": milliseconds}


def describe_single_mmif(mmif_file: Union[str, Path]) -> dict:
def describe_single_mmif(mmif_input: Union[str, Path, Mmif]) -> dict:
"""
Reads a MMIF file and extracts the workflow specification from it.
Reads a MMIF file or object and extracts the workflow specification from it.

This function provides an app-centric summarization of the workflow. The
conceptual hierarchy is that a **workflow** is a sequence of **apps**,
Expand Down Expand Up @@ -212,19 +230,11 @@ def describe_single_mmif(mmif_file: Union[str, Path]) -> dict:
The docstring above is used to generate help messages for the CLI command.
Do not remove the triple-dashed lines.

:param mmif_file: Path to the MMIF file
:param mmif_input: Path to MMIF file (str or Path) or a Mmif object
:return: A dictionary containing the workflow specification.
"""
if not isinstance(mmif_file, (str, Path)):
raise ValueError(
"MMIF file path must be a string or a Path object."
)

workflow_id = generate_workflow_identifier(mmif_file)
with open(mmif_file, "r") as f:
mmif_str = f.read()

mmif = Mmif(mmif_str)
mmif = _read_mmif_from_path(mmif_input)
workflow_id = generate_workflow_identifier(mmif)

error_view_ids = []
warning_view_ids = []
Expand Down
69 changes: 66 additions & 3 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,72 @@ def test_generate_workflow_identifier_grouped(self):
try:
workflow_id = workflow_helper.generate_workflow_identifier(tmp_file)
segments = workflow_id.split('/')
self.assertEqual(len(segments), 7)
self.assertIn('app1', segments[1])
self.assertIn('app2', segments[4])
self.assertEqual(len(segments), 6)
self.assertIn('app1', segments[0])
self.assertIn('app2', segments[3])
finally:
os.unlink(tmp_file)

def test_generate_workflow_identifier_with_mmif_object(self):
"""Test that generate_workflow_identifier accepts Mmif objects directly."""
from mmif.utils import workflow_helper
import os

# Test with Mmif object directly
workflow_id_from_obj = workflow_helper.generate_workflow_identifier(self.basic_mmif)

# Test with file path - should produce the same result
tmp_file = self.create_temp_mmif_file(self.basic_mmif)
try:
workflow_id_from_file = workflow_helper.generate_workflow_identifier(tmp_file)
self.assertEqual(workflow_id_from_obj, workflow_id_from_file)
finally:
os.unlink(tmp_file)

def test_read_mmif_from_path(self):
"""Test the _read_mmif_from_path helper function."""
from mmif.utils.workflow_helper import _read_mmif_from_path
from pathlib import Path
import os

# Test with Mmif object - should return as-is
result = _read_mmif_from_path(self.basic_mmif)
self.assertIs(result, self.basic_mmif)

# Test with file path string
tmp_file = self.create_temp_mmif_file(self.basic_mmif)
try:
result_from_str = _read_mmif_from_path(tmp_file)
self.assertIsInstance(result_from_str, Mmif)
self.assertEqual(result_from_str.serialize(pretty=False), self.basic_mmif.serialize(pretty=False))

# Test with Path object
result_from_path = _read_mmif_from_path(Path(tmp_file))
self.assertIsInstance(result_from_path, Mmif)
self.assertEqual(result_from_path.serialize(pretty=False), self.basic_mmif.serialize(pretty=False))
finally:
os.unlink(tmp_file)

# Test with invalid input
with pytest.raises(ValueError):
_read_mmif_from_path(12345)

def test_describe_single_mmif_with_mmif_object(self):
"""Test that describe_single_mmif accepts Mmif objects directly."""
from mmif.utils.workflow_helper import describe_single_mmif
import os

# Test with Mmif object directly
result_from_obj = describe_single_mmif(self.basic_mmif)

# Test with file path - should produce the same result
tmp_file = self.create_temp_mmif_file(self.basic_mmif)
try:
result_from_file = describe_single_mmif(tmp_file)
self.assertEqual(result_from_obj, result_from_file)
self.assertIn('workflowId', result_from_obj)
self.assertIn('stats', result_from_obj)
self.assertIn('apps', result_from_obj)
finally:
os.unlink(tmp_file)

Expand Down
Loading