diff --git a/.github/workflows/prepare_test_data.yaml b/.github/workflows/prepare_test_data.yaml index e694d2de..34d15cca 100644 --- a/.github/workflows/prepare_test_data.yaml +++ b/.github/workflows/prepare_test_data.yaml @@ -4,7 +4,8 @@ on: schedule: - cron: "0 0 1 * *" # run once a month to prevent artifact expiration workflow_dispatch: -# uncomment and adjust the branch name if you need to add new datasets to the artifact + # Uncomment and adjust the branch name if you need to add new datasets to the artifact. + # It needs to be a branch in the spatialdata-io origin repository, not from a fork. # push: # branches: # - main @@ -57,6 +58,14 @@ jobs: # Spatial Genomics seqFISH v2 curl -O https://s3.embl.de/spatialdata/raw_data/seqfish-2-test-dataset.zip + # ------- + # MACSima OMAP datasets are licensed as CC BY 4.0 + # OMAP23 for format v1.x.x + curl -o OMAP23_small.zip "https://zenodo.org/api/records/18196452/files-archive" + + # OMAP10 for format v0.x.x + curl -o OMAP10_small.zip "https://zenodo.org/api/records/18196366/files-archive" + - name: Unzip files run: | cd ./data diff --git a/src/spatialdata_io/readers/macsima.py b/src/spatialdata_io/readers/macsima.py index 8bbe0532..41b94f6e 100644 --- a/src/spatialdata_io/readers/macsima.py +++ b/src/spatialdata_io/readers/macsima.py @@ -1,5 +1,7 @@ from __future__ import annotations +import os +import re import warnings from collections import defaultdict from copy import deepcopy @@ -13,6 +15,7 @@ import pandas as pd import spatialdata as sd from dask_image.imread import imread +from ome_types import OME, from_tiff from spatialdata import SpatialData from spatialdata._logging import logger @@ -28,6 +31,14 @@ __all__ = ["macsima"] +# Dictionary to harmonize imagetype across metadata versions +IMAGETYPE_DICT = { + "BleachCycle": "bleach", # v0 + "B": "bleach", # v1 + "AntigenCycle": "stain", # v0 + "S": "stain", # v1 +} + class MACSimaParsingStyle(ModeEnum): """Different parsing styles for MACSima data.""" @@ -44,6 +55,12 @@ class ChannelMetadata: name: str cycle: int + imagetype: str + well: str + roi: int + fluorophore: str + exposure: float + clone: str | None = None # For example DAPI doesnt have a clone @dataclass @@ -61,39 +78,41 @@ def from_paths( imread_kwargs: Mapping[str, Any], skip_rounds: list[int] | None = None, ) -> MultiChannelImage: - cycles = [] - channels = [] + channel_metadata: list[ChannelMetadata] = [] for p in path_files: - cycle = parse_name_to_cycle(p.stem) - cycles.append(cycle) try: - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - channel_names = parse_channels(p) - if len(channel_names) > 1: - warnings.warn( - f"Found multiple channels in OME-TIFF file {p}. Only the first one will be used.", - UserWarning, - stacklevel=2, - ) - channels.append(channel_names[0]) + metadata = parse_metadata(p) except ValueError as e: warnings.warn( f"Cannot parse OME metadata from {p}. Error: {e}. Skipping this file.", UserWarning, stacklevel=2 ) + continue + + channel_metadata.append( + ChannelMetadata( + name=metadata["name"], + cycle=metadata["cycle"], + imagetype=metadata["imagetype"], + well=metadata["well"], + roi=metadata["roi"], + fluorophore=metadata["fluorophore"], + clone=metadata["clone"], + exposure=metadata["exposure"], + ) + ) - if len(path_files) != len(cycles) or len(path_files) != len(channels): - raise ValueError("Length of path_files, cycles and channels must be the same.") + if len(path_files) != len(channel_metadata): + raise ValueError("Length of path_files and metadata must be the same.") # if any of round_channels is in skip_rounds, remove that round from the list and from path_files if skip_rounds: logger.info(f"Skipping cycles: {skip_rounds}") - path_files, cycles, channels = map( + path_files, channel_metadata = map( list, zip( *[ - (p, c, ch) - for p, c, ch in zip(path_files, cycles, channels, strict=True) - if c not in skip_rounds + (p, ch_meta) + for p, ch_meta in zip(path_files, channel_metadata, strict=True) + if ch_meta.cycle not in skip_rounds ], strict=True, ), @@ -108,7 +127,7 @@ def from_paths( # create MultiChannelImage object with imgs and metadata output = cls( data=imgs, - metadata=[ChannelMetadata(name=ch, cycle=c) for c, ch in zip(cycles, channels, strict=True)], + metadata=channel_metadata, ) return output @@ -150,6 +169,30 @@ def get_cycles(self) -> list[int]: """Get the cycle numbers.""" return [c.cycle for c in self.metadata] + def get_image_types(self) -> list[str | None]: + """Get the staining types (stain or bleach).""" + return [c.imagetype for c in self.metadata] + + def get_wells(self) -> list[str | None]: + """Get the wells.""" + return [c.well for c in self.metadata] + + def get_rois(self) -> list[int | None]: + """Get the ROIs.""" + return [c.roi for c in self.metadata] + + def get_fluorophores(self) -> list[str | None]: + """Get the fluorophores.""" + return [c.fluorophore for c in self.metadata] + + def get_clones(self) -> list[str | None]: + """Get the clones.""" + return [c.clone for c in self.metadata] + + def get_exposures(self) -> list[float | None]: + """Get the exposures.""" + return [c.exposure for c in self.metadata] + def sort_by_channel(self) -> None: """Sort the channels by cycle number.""" self.data = [d for _, d in sorted(zip(self.metadata, self.data, strict=True), key=lambda x: x[0].cycle)] @@ -189,8 +232,7 @@ def macsima( ) -> SpatialData: """Read *MACSima* formatted dataset. - This function reads images from a MACSima cyclic imaging experiment. Metadata of the cycle rounds is parsed from - the image names. The channel names are parsed from the OME metadata. + This function reads images from a MACSima cyclic imaging experiment. Metadata is parsed from the OME metadata. .. seealso:: @@ -300,12 +342,257 @@ def macsima( raise NotImplementedError("Parsing raw MACSima data is not yet implemented.") -def parse_name_to_cycle(name: str) -> int: - """Parse the cycle number from the name of the image.""" - cycle = name.split("_")[0] - if "-" in cycle: - cycle = cycle.split("-")[1] - return int(cycle) +def _collect_map_annotation_values(ome: OME) -> dict[str, Any]: + """Collapse structured_annotations from OME into dictionary. + + Collects all key/value pairs from all map_annotations ins structured_annotations into a single flat dictionary. + If a key appears multiple times across annotations, the *first* + occurrence wins and later occurrences are ignored. + """ + merged: dict[str, Any] = {} + + sa = getattr(ome, "structured_annotations", None) + map_annotations = getattr(sa, "map_annotations", []) if sa else [] + + for ma in map_annotations: + raw_value = ma.value + value = raw_value.dict() + + for k, v in value.items(): + if k not in merged: + merged[k] = v + else: + # We do expect repeated keys with different values, because the same key is reused for different annotations. + # But the order is fixed and fine for what we need. + # Therefore log this for debugging, if it becomes a problem, but don't throw warnings to the user. + if v != merged[k]: + logger.debug( + f"Found different value for {k}: {v}. The parser will only use the first found value, which is {merged[k]}!" + ) + + return merged + + +def _get_software_version(ma_values: dict[str, Any]) -> str: + """Extract the software version string from the flattened map-annotation values. + + Supports both: + - 'Software version' (v0) + - 'SoftwareVersion' (v1) + """ + for key in ("SoftwareVersion", "Software version"): + v = ma_values.get(key) + if isinstance(v, str) and v.strip(): + return v.strip() + + raise ValueError("Could not extract Software Version from OME metadata.") + + +def _get_software_major_version(version: str) -> int: + """Parse the major component of a semantic version string.""" + s = version.strip() + if s.startswith(("v", "V")): + s = s[1:] + parts = s.split(".") + if not parts: + raise ValueError("Could not extract major software version part from version string.") + + major = int(parts[0]) + logger.debug(f"Found major software version {major}") + + return major + + +def _parse_v0_ome_metadata(ome: OME) -> dict[str, Any]: + """Parse Legacy Format of OME Metadata (software version 0.x.x).""" + logger.debug("Parsing OME metadata expecting version 0 format") + + metadata: dict[str, Any] = { + "name": None, + "clone": None, + "fluorophore": None, + "cycle": None, + "imagetype": None, + "well": None, + "roi": None, + "exposure": None, + } + + antigen = None + clone = None + + if ome.screens: + screen0 = ome.screens[0] + reagents = getattr(screen0, "reagents", []) + if reagents: + r0 = reagents[0] + name = getattr(r0, "name", None) + if isinstance(name, str) and name: + if "__" in name: + antigen, clone = name.split("__", 1) + else: + antigen = name + clone = None + + metadata["name"] = antigen + metadata["clone"] = clone + + ma_values = _collect_map_annotation_values(ome) + + if "Fluorochrome" in ma_values: + metadata["fluorophore"] = ma_values["Fluorochrome"] + + if "Exposure time" in ma_values: + exp_time = ma_values["Exposure time"] + try: + metadata["exposure"] = float(exp_time) + except (TypeError, ValueError): + metadata["exposure"] = None + + if "Cycle" in ma_values: + cyc = ma_values["Cycle"] + try: + metadata["cycle"] = int(cyc) + except (TypeError, ValueError): + metadata["cycle"] = None + + if "ROI ID" in ma_values: + roi = ma_values["ROI ID"] + try: + metadata["roi"] = int(roi) + except (TypeError, ValueError): + metadata["roi"] = None + + if "MICS cycle type" in ma_values: + metadata["imagetype"] = ma_values["MICS cycle type"] + + well = None + if ome.plates: + plate0 = ome.plates[0] + wells = getattr(plate0, "wells", []) + if wells: + w0 = wells[0] + ext_id = getattr(w0, "external_identifier", None) + if isinstance(ext_id, str) and ext_id: + well = ext_id + + metadata["well"] = well + + # Add _background suffix to marker name of bleach images, to distinguish them from stain image + if metadata["imagetype"] == "BleachCycle": + metadata["name"] = metadata["name"] + "_background" + + # Harmonize imagetype across versions + if metadata["imagetype"]: + metadata["imagetype"] = IMAGETYPE_DICT[metadata["imagetype"]] + + return metadata + + +def _parse_v1_ome_metadata(ome: OME) -> dict[str, Any]: + """Parse v1 format of OME metadata (software version 1.x.x).""" + logger.debug("Parsing OME metadata expecting version 1 format") + + metadata: dict[str, Any] = { + "name": None, + "clone": None, + "fluorophore": None, + "cycle": None, + "imagetype": None, + "well": None, + "roi": None, + "exposure": None, + } + + ma_values = _collect_map_annotation_values(ome) + + if "Clone" in ma_values: + metadata["clone"] = ma_values["Clone"] + + antigen_name = None + if "Biomarker" in ma_values and ma_values["Biomarker"]: + antigen_name = ma_values["Biomarker"] + elif "Dye" in ma_values and ma_values["Dye"]: + antigen_name = ma_values["Dye"] + + metadata["name"] = antigen_name + + if "Fluorochrome" in ma_values and ma_values["Fluorochrome"]: + metadata["fluorophore"] = ma_values["Fluorochrome"] + elif "Dye" in ma_values and ma_values["Dye"]: + metadata["fluorophore"] = ma_values["Dye"] + + if "ExposureTime" in ma_values: + exp_time = ma_values["ExposureTime"] + try: + metadata["exposure"] = float(exp_time) + except (TypeError, ValueError): + metadata["exposure"] = None + + if "Cycle" in ma_values: + cyc = ma_values["Cycle"] + try: + metadata["cycle"] = int(cyc) + except (TypeError, ValueError): + metadata["cycle"] = None + + if "RoiId" in ma_values: + roi = ma_values["RoiId"] + try: + metadata["roi"] = int(roi) + except (TypeError, ValueError): + metadata["roi"] = None + + if "ScanType" in ma_values: + metadata["imagetype"] = ma_values["ScanType"] + + well = None + if ome.plates: + plate0 = ome.plates[0] + wells = getattr(plate0, "wells", []) + if wells: + w0 = wells[0] + ext_id = getattr(w0, "external_identifier", None) + if isinstance(ext_id, str) and ext_id: + well = ext_id + + metadata["well"] = well + + # Add _background suffix to marker name of bleach images, to distinguis them from stain image + if metadata["imagetype"] == "B": + metadata["name"] = metadata["name"] + "_background" + + # Harmonize imagetype across versions + if metadata["imagetype"]: + metadata["imagetype"] = IMAGETYPE_DICT[metadata["imagetype"]] + + return metadata + + +def _parse_ome_metadata(ome: OME) -> dict[str, Any]: + """Extract the software version from OME metadata and parse with appropriate parser.""" + ma_values = _collect_map_annotation_values(ome) + version_str = _get_software_version(ma_values) + major = _get_software_major_version(version_str) + + if major == 0: + return _parse_v0_ome_metadata(ome) + elif major == 1: + return _parse_v1_ome_metadata(ome) + else: + raise ValueError("Unknown software version, cannot determine parser") + + +def parse_metadata(path: Path) -> dict[str, Any]: + """Parse metadata for a file. + + All metadata is extracted from the OME metadata. + """ + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + ome = from_tiff(path) + + return _parse_ome_metadata(ome) def parse_processed_folder( @@ -326,8 +613,7 @@ def parse_processed_folder( include_cycle_in_channel_name: bool = False, ) -> SpatialData: """Parse a single folder containing images from a cyclical imaging platform.""" - # get list of image paths, get channel name from OME data and cycle round number from filename - # look for OME-TIFF files + # get list of image paths, look for OME-TIFF files # TODO: replace this pattern and the p.suffix in [".tif", ".tiff"] with a single function based on a regexp, like # this one re.compile(r".*\.tif{1,2}$", re.IGNORECASE) path_files = list(path.glob(file_pattern)) @@ -396,7 +682,9 @@ def create_sdata( [i for i in range(len(mci.metadata)) if i not in nuclei_idx_without_first_and_last], ) - pixels_to_microns = parse_physical_size(path_files[0]) + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + pixels_to_microns = parse_physical_size(path_files[0]) image_element = create_image_element( mci, @@ -440,12 +728,30 @@ def create_sdata( def create_table(mci: MultiChannelImage) -> ad.AnnData: cycles = mci.get_cycles() names = mci.get_channel_names() + imagetypes = mci.get_image_types() + wells = mci.get_wells() + rois = mci.get_rois() + fluorophores = mci.get_fluorophores() + clones = mci.get_clones() + exposures = mci.get_exposures() + df = pd.DataFrame( { "name": names, "cycle": cycles, + "imagetype": imagetypes, + "well": wells, + "ROI": rois, + "fluorophore": fluorophores, + "clone": clones, + "exposure": exposures, } ) + + # Replace missing data. This happens mostly in the clone column. + df = df.replace({None: pd.NA, "": pd.NA}) + df.index = df.index.astype(str) + table = ad.AnnData(var=df) table.var_names = names return sd.models.TableModel.parse(table) diff --git a/tests/test_macsima.py b/tests/test_macsima.py index 63d6ca24..73723b19 100644 --- a/tests/test_macsima.py +++ b/tests/test_macsima.py @@ -5,8 +5,18 @@ from typing import Any import dask.array as da +import pandas as pd import pytest from click.testing import CliRunner +from ome_types import OME +from ome_types.model import ( + MapAnnotation, + Plate, + Reagent, + Screen, + StructuredAnnotations, + Well, +) from spatialdata import read_zarr from spatialdata.models import get_channel_names @@ -14,27 +24,55 @@ from spatialdata_io.readers.macsima import ( ChannelMetadata, MultiChannelImage, + _collect_map_annotation_values, + _get_software_major_version, + _get_software_version, + _parse_ome_metadata, + _parse_v0_ome_metadata, + _parse_v1_ome_metadata, macsima, - parse_name_to_cycle, ) from tests._utils import skip_if_below_python_version RNG = da.random.default_rng(seed=0) -if not (Path("./data/Lung_adc_demo").exists() or Path("./data/MACSimaData_HCA").exists()): +if not (Path("./data/OMAP10_small").exists() or Path("./data/OMAP23_small").exists()): pytest.skip( - "Requires the Lung_adc_demo or MACSimaData_HCA datasets, please check " - "https://github.com/giovp/spatialdata-sandbox/macsima/Readme.md for instructions on how to get the data.", + "Requires the OMAP10 or OMAP23 datasets. " + "The small OMAP10 dataset can be downloaded from TBD, for the full data see https://zenodo.org/records/7875938 " + "The small OMAP23 dataset can be downloaded from TBD, for the full data set see https://zenodo.org/records/14008816", allow_module_level=True, ) +# Helper to create ChannelMetadata with some defaults +def make_ChannelMetadata( + name: str, + cycle: int, + fluorophore: str | None = None, + exposure: float | None = None, + imagetype: str | None = None, + well: str | None = None, + roi: int | None = None, +) -> ChannelMetadata: + """Helper to construct ChannelMetadata with required defaults.""" + return ChannelMetadata( + name=name, + cycle=cycle, + fluorophore=fluorophore or "", + exposure=exposure if exposure is not None else 0.0, + imagetype=imagetype or "StainCycle", + well=well or "A01", + roi=roi if roi is not None else 0, + ) + + @skip_if_below_python_version() @pytest.mark.parametrize( "dataset,expected", [ - ("Lung_adc_demo", {"y": (0, 15460), "x": (0, 13864)}), - ("MACSimaData_HCA/HumanLiverH35", {"y": (0, 1154), "x": (0, 1396)}), + ("OMAP10_small", {"y": (0, 77), "x": (0, 94)}), + ("OMAP23_small", {"y": (0, 77), "x": (0, 93)}), ], ) def test_image_size(dataset: str, expected: dict[str, Any]) -> None: @@ -42,7 +80,7 @@ def test_image_size(dataset: str, expected: dict[str, Any]) -> None: f = Path("./data") / dataset assert f.is_dir() - sdata = macsima(f) + sdata = macsima(f, transformations=False) # Do not transform to make it easier to compare against pixel dimensions el = sdata[list(sdata.images.keys())[0]] cs = sdata.coordinate_systems[0] @@ -54,7 +92,7 @@ def test_image_size(dataset: str, expected: dict[str, Any]) -> None: @skip_if_below_python_version() @pytest.mark.parametrize( "dataset,expected", - [("Lung_adc_demo", 116), ("MACSimaData_HCA/HumanLiverH35", 102)], + [("OMAP10_small", 4), ("OMAP23_small", 5)], ) def test_total_channels(dataset: str, expected: int) -> None: f = Path("./data") / dataset @@ -71,14 +109,14 @@ def test_total_channels(dataset: str, expected: int) -> None: @pytest.mark.parametrize( "dataset,expected", [ - ("Lung_adc_demo", ["R0 DAPI", "R1 CD68", "R1 CD163"]), - ("MACSimaData_HCA/HumanLiverH35", ["R0 DAPI", "R1 PE", "R1 DAPI"]), + ("OMAP10_small", ["R1 DAPI", "R1 CD15", "R2 Bcl 2", "R2 CD1c"]), + ("OMAP23_small", ["R1 DAPI", "R1 CD3", "R2 CD279", "R4 CD66b", "R15 DAPI_background"]), ], ) -def test_channel_names(dataset: str, expected: list[str]) -> None: +def test_channel_names_with_cycle_in_name(dataset: str, expected: list[str]) -> None: f = Path("./data") / dataset assert f.is_dir() - sdata = macsima(f, c_subset=3, include_cycle_in_channel_name=True) + sdata = macsima(f, include_cycle_in_channel_name=True) el = sdata[list(sdata.images.keys())[0]] # get the channel names @@ -90,8 +128,8 @@ def test_channel_names(dataset: str, expected: list[str]) -> None: @pytest.mark.parametrize( "dataset,expected", [ - ("Lung_adc_demo", 68), - ("MACSimaData_HCA/HumanLiverH35", 51), + ("OMAP10_small", 2), + ("OMAP23_small", 15), ], ) def test_total_rounds(dataset: str, expected: list[int]) -> None: @@ -107,11 +145,11 @@ def test_total_rounds(dataset: str, expected: list[int]) -> None: @pytest.mark.parametrize( "dataset,skip_rounds,expected", [ - ("Lung_adc_demo", list(range(2, 68)), ["DAPI (1)", "CD68", "CD163", "DAPI (2)", "Control"]), + ("OMAP10_small", list(range(2, 4)), ["DAPI", "CD15"]), ( - "MACSimaData_HCA/HumanLiverH35", - list(range(2, 51)), - ["DAPI (1)", "PE", "CD14", "Vimentin", "DAPI (2)", "WT1"], + "OMAP23_small", + list(range(2, 16)), + ["DAPI", "CD3"], ), ], ) @@ -126,23 +164,66 @@ def test_skip_rounds(dataset: str, skip_rounds: list[int], expected: list[str]) assert list(channels) == expected, f"Expected {expected}, got {list(channels)}" +METADATA_COLUMN_ORDER = [ + "cycle", + "imagetype", + "well", + "ROI", + "fluorophore", + "clone", + "exposure", +] + +EXPECTED_METADATA_OMAP10 = pd.DataFrame( + { + "name": ["DAPI", "CD15", "Bcl 2", "CD1c"], + "cycle": [1, 1, 2, 2], + "imagetype": ["stain", "stain", "stain", "stain"], + "well": ["C-1", "C-1", "C-1", "C-1"], + "ROI": [1, 1, 1, 1], + "fluorophore": ["DAPI", "APC", "FITC", "PE"], + "clone": [pd.NA, "VIMC6", "REA872", "REA694"], + "exposure": [40.0, 2304.0, 96.0, 144.0], + }, + index=["DAPI", "CD15", "Bcl 2", "CD1c"], + columns=METADATA_COLUMN_ORDER, +) + + +EXPECTED_METADATA_OMAP23 = pd.DataFrame( + { + "name": ["DAPI", "CD3", "CD279", "CD66b", "DAPI_background"], + "cycle": [1, 1, 2, 4, 15], + "imagetype": ["stain", "stain", "stain", "stain", "bleach"], + "well": ["D01", "D01", "D01", "D01", "D01"], + "ROI": [1, 1, 1, 1, 1], + "fluorophore": ["DAPI", "APC", "PE", "FITC", "DAPI"], + "clone": [pd.NA, "REA1151", "REA1165", "REA306", pd.NA], + "exposure": [51.0, 1212.52, 322.12, 856.68, 51.0], + }, + index=["DAPI", "CD3", "CD279", "CD66b", "DAPI_background"], + columns=METADATA_COLUMN_ORDER, +) + + @skip_if_below_python_version() @pytest.mark.parametrize( - "dataset,expected", + "dataset,expected_df", [ - ("Lung_adc_demo", [0, 1, 1]), - ("MACSimaData_HCA/HumanLiverH35", [0, 1, 1]), + ("OMAP10_small", EXPECTED_METADATA_OMAP10), + ("OMAP23_small", EXPECTED_METADATA_OMAP23), ], ) -def test_cycle_metadata(dataset: str, expected: list[str]) -> None: +def test_metadata_table(dataset: str, expected_df: pd.DataFrame) -> None: f = Path("./data") / dataset assert f.is_dir() - sdata = macsima(f, c_subset=3) + sdata = macsima(f) table = sdata[list(sdata.tables.keys())[0]] - # get the channel names - cycles = table.var["cycle"] - assert list(cycles) == expected + # Convert table.var to a DataFrame and align to expected columns + actual = table.var[METADATA_COLUMN_ORDER] + + pd.testing.assert_frame_equal(actual, expected_df) def test_parsing_style() -> None: @@ -150,25 +231,15 @@ def test_parsing_style() -> None: macsima(Path(), parsing_style="not_a_parsing_style") -@pytest.mark.parametrize( - "name,expected", - [ - ("C-002_S-000_S_FITC_R-01_W-C-1_ROI-01_A-CD147_C-REA282.tif", 2), - ("001_S_R-01_W-B-1_ROI-01_A-CD14REA599ROI1_C-REA599.ome.tif", 1), - ], -) -def test_parsing_of_name_to_cycle(name: str, expected: int) -> None: - result = parse_name_to_cycle(name) - assert result == expected - - def test_mci_sort_by_channel() -> None: sizes = [100, 200, 300] c_names = ["test11", "test3", "test2"] cycles = [2, 0, 1] mci = MultiChannelImage( data=[RNG.random((size, size), chunks=(10, 10)) for size in sizes], - metadata=[ChannelMetadata(name=c_name, cycle=cycle) for c_name, cycle in zip(c_names, cycles, strict=False)], + metadata=[ + make_ChannelMetadata(name=c_name, cycle=cycle) for c_name, cycle in zip(c_names, cycles, strict=False) + ], ) assert mci.get_channel_names() == c_names assert [x.shape[0] for x in mci.data] == sizes @@ -182,7 +253,7 @@ def test_mci_array_reference() -> None: arr2 = RNG.random((200, 200), chunks=(10, 10)) mci = MultiChannelImage( data=[arr1, arr2], - metadata=[ChannelMetadata(name="test1", cycle=0), ChannelMetadata(name="test2", cycle=1)], + metadata=[make_ChannelMetadata(name="test1", cycle=0), make_ChannelMetadata(name="test2", cycle=1)], ) orig_arr1 = arr1.copy() @@ -206,8 +277,8 @@ def test_mci_array_reference() -> None: @skip_if_below_python_version() -@pytest.mark.parametrize("dataset", ["Lung_adc_demo", "MACSimaData_HCA/HumanLiverH35"]) -def test_cli_macimsa(runner: CliRunner, dataset: str) -> None: +@pytest.mark.parametrize("dataset", ["OMAP10_small", "OMAP23_small"]) +def test_cli_macsima(runner: CliRunner, dataset: str) -> None: f = Path("./data") / dataset assert f.is_dir() with TemporaryDirectory() as tmpdir: @@ -229,3 +300,274 @@ def test_cli_macimsa(runner: CliRunner, dataset: str) -> None: ) assert result.exit_code == 0, result.output _ = read_zarr(output_zarr) + + +def test_collect_map_annotation_values_with_no_duplicate_keys() -> None: + ome = OME( + structured_annotations=StructuredAnnotations( + map_annotations=[ + MapAnnotation(value={"a": "1", "b": "2"}), + MapAnnotation(value={"c": "3"}), + ] + ) + ) + + result = _collect_map_annotation_values(ome) + + assert result == {"a": "1", "b": "2", "c": "3"} + + +def test_collect_map_annotations_values_with_duplicate_keys_identical_values() -> None: + ome = OME( + structured_annotations=StructuredAnnotations( + map_annotations=[ + MapAnnotation(value={"a": "1", "b": "2"}), + MapAnnotation(value={"b": "2", "c": "3"}), + ] + ) + ) + + result = _collect_map_annotation_values(ome) + # Key should only be returned once + assert result == {"a": "1", "b": "2", "c": "3"} + + +def test_collect_map_annotations_values_with_duplicate_keys_different_values() -> None: + ome = OME( + structured_annotations=StructuredAnnotations( + map_annotations=[ + MapAnnotation(value={"a": "1", "b": "2"}), + MapAnnotation(value={"b": "99", "c": "3"}), + ] + ) + ) + import re + + result = _collect_map_annotation_values(ome) + + # The parser should return only the first found value. + assert result == {"a": "1", "b": "2", "c": "3"} + + +def test_collect_map_annotation_values_handles_missing_sa_and_empty_list() -> None: + # No structured_annotations at all + ome1 = OME() + assert _collect_map_annotation_values(ome1) == {} + + # structured_annotations present but with empty map_annotation list + ome2 = OME(structured_annotations=StructuredAnnotations(map_annotations=[])) + assert _collect_map_annotation_values(ome2) == {} + + +@pytest.mark.parametrize( + "ma_values, expected", + [ + ({"SoftwareVersion": " 1.2.3 "}, "1.2.3"), + ({"Software version": " v0.9.0"}, "v0.9.0"), + ], +) +def test_get_software_version_success(ma_values: dict[str, str], expected: str) -> None: + assert _get_software_version(ma_values) == expected + + +@pytest.mark.parametrize( + "ma_values", + [ + ({}), + ({"SoftwareVersion": ""}), + ({"SoftwareVersion": " "}), + ({"Software version": ""}), + ({"Software version": None}), + ], +) +def test_get_software_version_failure(ma_values: dict[str, str | None]) -> None: + with pytest.raises(ValueError, match="Could not extract Software Version"): + _get_software_version(ma_values) + + +@pytest.mark.parametrize( + "version, expected", + [ + ("1.2.3", 1), + (" 2.0.0 ", 2), + ("v3.4.5", 3), + ("V4.0.1", 4), + ("10", 10), + ], +) +def test_get_software_major_version_success(version: str, expected: int) -> None: + assert _get_software_major_version(version) == expected + + +def test_get_software_major_version_failure() -> None: + with pytest.raises(ValueError): + _get_software_major_version("") + + +def test_parse_v0_ome_metadata_basic_extraction_and_conversions() -> None: + ome = OME( + structured_annotations=StructuredAnnotations( + map_annotations=[ + MapAnnotation( + value={ + "Fluorochrome": "AF488", + "Exposure time": "123.4", + "Cycle": "5", + "ROI ID": "7", + "MICS cycle type": "AntigenCycle", + } + ) + ] + ), + screens=[Screen(reagents=[Reagent(name="CD3__OKT3")])], + plates=[Plate(wells=[Well(column=0, row=0, external_identifier="A01")])], + ) + + md = _parse_v0_ome_metadata(ome) + + assert md["name"] == "CD3" + assert md["clone"] == "OKT3" + assert md["fluorophore"] == "AF488" + assert md["exposure"] == pytest.approx(123.4) + assert md["cycle"] == 5 + assert md["roi"] == 7 + assert md["imagetype"] == "stain" # harmonized! + assert md["well"] == "A01" + + +def test_parse_v0_ome_metadata_handles_missing_or_invalid_numeric_fields() -> None: + ome = OME( + structured_annotations=StructuredAnnotations( + map_annotations=[ + MapAnnotation( + value={ + "Exposure time": "not-a-number", + "Cycle": "NaN", + "ROI ID": "x", + } + ) + ] + ), + screens=[Screen(reagents=[Reagent(name="MarkerOnly")])], + plates=[Plate(wells=[Well(column=0, row=0, external_identifier=None)])], + ) + + md = _parse_v0_ome_metadata(ome) + + # name from reagent without "__" + assert md["name"] == "MarkerOnly" + assert md["clone"] is None + assert md["exposure"] is None + assert md["cycle"] is None + assert md["roi"] is None + # well remains None + assert md["well"] is None + + +def test_parse_v0_ome_metadata_bleach_cycle_appends_background() -> None: + ome = OME( + structured_annotations=StructuredAnnotations( + map_annotations=[ + MapAnnotation( + value={ + "MICS cycle type": "BleachCycle", + } + ) + ] + ), + screens=[Screen(reagents=[Reagent(name="CD4__RPA-T4")])], + ) + + md = _parse_v0_ome_metadata(ome) + + assert md["imagetype"] == "bleach" # harmonized! + assert md["name"] == "CD4_background" + + +def test_parse_v1_ome_metadata_basic_extraction_and_conversions() -> None: + ome = OME( + structured_annotations=StructuredAnnotations( + map_annotations=[ + MapAnnotation( + value={ + "Clone": "OKT3", + "Biomarker": "CD3", + "Fluorochrome": "AF488", + "ExposureTime": "45.6", + "Cycle": "3", + "RoiId": "10", + "ScanType": "S", + } + ) + ] + ), + plates=[Plate(wells=[Well(column=0, row=0, external_identifier="B02")])], + ) + + md = _parse_v1_ome_metadata(ome) + + assert md["name"] == "CD3" + assert md["clone"] == "OKT3" + assert md["fluorophore"] == "AF488" + assert md["exposure"] == pytest.approx(45.6) + assert md["cycle"] == 3 + assert md["roi"] == 10 + assert md["imagetype"] == "stain" # harmonized! + assert md["well"] == "B02" + + +def test_parse_v1_ome_metadata_invalid_numerics_become_none() -> None: + ome = OME( + structured_annotations=StructuredAnnotations( + map_annotations=[ + MapAnnotation( + value={ + "ExposureTime": "x", + "Cycle": "NaN", + "RoiId": "ABC", + } + ) + ] + ), + ) + + md = _parse_v1_ome_metadata(ome) + + assert md["exposure"] is None + assert md["cycle"] is None + assert md["roi"] is None + + +def make_ome_with_version(version_value: str, extra_ma: dict[str, Any] | None = None) -> OME: + base = {"SoftwareVersion": version_value} + if extra_ma: + base.update(extra_ma) + return OME(structured_annotations=StructuredAnnotations(map_annotations=[MapAnnotation(value=base)])) + + +def test_parse_ome_metadata_dispatches_to_v0() -> None: + ome = make_ome_with_version("0.9.0") + # enrich some so v0 parser has something to see + ome.screens = [Screen(reagents=[Reagent(name="Marker0")])] + + md = _parse_ome_metadata(ome) + + # Assert that the v0 parser was used by checking a field + # that can only come from v0 parsing logic (e.g. name from reagent) + assert "name" in md + assert md["name"] == "Marker0" + + +def test_parse_ome_metadata_dispatches_to_v1() -> None: + ome = make_ome_with_version("1.0.0", extra_ma={"Biomarker": "CD3"}) + + md = _parse_ome_metadata(ome) + + assert md["name"] == "CD3" + + +def test_parse_ome_metadata_unknown_major_raises() -> None: + ome = make_ome_with_version("2.0.0") + + with pytest.raises(ValueError, match="Unknown software version"): + _parse_ome_metadata(ome)