diff --git a/examples/zarr_arr.py b/examples/zarr_arr.py index bee4eb09..0c6790a2 100644 --- a/examples/zarr_arr.py +++ b/examples/zarr_arr.py @@ -20,5 +20,4 @@ except ImportError: raise ImportError("Please `pip install zarr aiohttp` to run this example") - -ndv.imshow(zarr_arr["s4"], current_index={1: 30}, visible_axes=(0, 2)) +ndv.imshow(zarr_arr["s4"].astype("uint16"), current_index={1: 30}, visible_axes=(0, 2)) diff --git a/pyproject.toml b/pyproject.toml index 49ede12f..2d6003dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,9 @@ dependencies = [ "ihist>=0.1.3", ] +[project.scripts] +ndv = "ndv.cli:main" + # https://peps.python.org/pep-0621/#dependencies-optional-dependencies [project.optional-dependencies] # Supported GUI frontends @@ -63,10 +66,27 @@ wxpython = ["pyconify>=0.2.1", "wxpython >=4.2.2"] vispy = ["vispy>=0.16", "pyopengl >=3.1"] pygfx = ["pygfx>=0.16.0", "rendercanvas>=2.6.2", "wgpu>=0.31.0"] -# ready to go bundles with pygfx -qt = ["ndv[pygfx,pyqt]", "imageio[tifffile] >=2.20"] -jup = ["ndv[pygfx,jupyter]", "imageio[tifffile] >=2.20"] -wx = ["ndv[pygfx,wxpython]", "imageio[tifffile] >=2.20"] +# Core IO (tiff + zarr, no Java) +io = [ + "tifffile >=2024.1.30", + "xarray >=2024.7.0", + "yaozarrs >=0.1.0", + "tensorstore >=0.1.69,!=0.1.72", +] + +# Format-specific readers +nd2 = ["nd2[legacy] >=0.10.0"] +lif = ["readlif >=0.6.0"] +czi = ["pylibCZIrw >=4.1.0"] +bioformats = ["bffile >=0.0.1rc1", "xarray >=2024.7.0"] + +# Everything +io-all = ["ndv[io,nd2,lif,czi,bioformats]"] + +# ready to go bundles with pygfx and io +qt = ["ndv[pygfx,pyqt,io-all]"] +jup = ["ndv[pygfx,jupyter,io-all]"] +wx = ["ndv[pygfx,wxpython,io-all]"] [project.urls] diff --git a/src/ndv/cli.py b/src/ndv/cli.py new file mode 100644 index 00000000..73fcf0d4 --- /dev/null +++ b/src/ndv/cli.py @@ -0,0 +1,38 @@ +"""command-line program.""" + +from __future__ import annotations + +import argparse +from typing import Any + +from ndv.util import imshow + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="ndv: ndarray viewer") + parser.add_argument("path", type=str, help="File to view") + parser.add_argument( + "-s", "--series", type=int, default=0, help="Series index (default: 0)" + ) + parser.add_argument( + "-l", "--level", type=int, default=0, help="Resolution level (default: 0)" + ) + return parser.parse_args() + + +def main() -> None: + """Run the command-line program.""" + from ndv import io + + args = _parse_args() + data = io.imread(args.path, series=args.series, level=args.level) + + display_kwargs: dict[str, Any] = {} + if ndv_display := data.attrs.pop("ndv_display", None): + if colors := ndv_display.get("channel_colors"): + display_kwargs["luts"] = {i: {"cmap": c} for i, c in colors.items()} + + if "C" in data.dims and data.sizes["C"] > 1: + display_kwargs.setdefault("channel_mode", "composite") + + imshow(data, **display_kwargs) diff --git a/src/ndv/io/__init__.py b/src/ndv/io/__init__.py new file mode 100644 index 00000000..165ec5e4 --- /dev/null +++ b/src/ndv/io/__init__.py @@ -0,0 +1,7 @@ +"""Opinionated bioimage IO — read common formats into xarray.DataArray.""" + +from __future__ import annotations + +from ndv.io._dispatch import imread + +__all__ = ["imread"] diff --git a/src/ndv/io/_bioformats.py b/src/ndv/io/_bioformats.py new file mode 100644 index 00000000..54a07d68 --- /dev/null +++ b/src/ndv/io/_bioformats.py @@ -0,0 +1,74 @@ +"""Bio-Formats fallback reader using bffile.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pathlib import Path + + import ome_types + import xarray as xr + +EXTENSIONS: set[str] = set() # fallback — handles anything + + +def can_read(path: Path) -> bool: + return True + + +def imread(path: Path, *, series: int = 0, level: int = 0) -> xr.DataArray: + """Read a file using Bio-Formats via bffile.""" + from bffile import BioFile + + bf = BioFile(path) + bf.open() + da = bf.to_xarray(series=series, resolution=-1) + + # Extract channel colors from OME metadata + ndv_display: dict[str, object] = {} + try: + ome = bf.ome_metadata + img = ome.images[series] + channels = img.pixels.channels + colors: dict[int, str] = {} + for i, ch in enumerate(channels): + if ch.color is not None: + cmap_name = _ome_color_to_cmap(ch.color) + if cmap_name: + colors[i] = cmap_name + if colors: + ndv_display["channel_colors"] = colors + except Exception: + pass + + if ndv_display: + da.attrs["ndv_display"] = ndv_display + + # Keep file handle alive + da.attrs["_biofile"] = bf + return da + + +def _ome_color_to_cmap(color: ome_types.model.Color) -> str | None: + """Convert an ome_types Color to a cmap name.""" + try: + r, g, b = color.as_rgb_tuple(alpha=False) # pyright: ignore[reportAssignmentType] + except AttributeError: + return None + + if r > 200 and g < 50 and b < 50: + return "red" + if r < 50 and g > 200 and b < 50: + return "green" + if r < 50 and g < 50 and b > 200: + return "blue" + if r > 200 and g > 200 and b < 50: + return "yellow" + if r > 200 and g < 50 and b > 200: + return "magenta" + if r < 50 and g > 200 and b > 200: + return "cyan" + if r > 200 and g > 200 and b > 200: + return "gray" + return None diff --git a/src/ndv/io/_czi.py b/src/ndv/io/_czi.py new file mode 100644 index 00000000..7792d13e --- /dev/null +++ b/src/ndv/io/_czi.py @@ -0,0 +1,180 @@ +"""CZI reader using pylibCZIrw.""" + +from __future__ import annotations + +import xml.etree.ElementTree as ET +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from pathlib import Path + + import xarray as xr + +EXTENSIONS: set[str] = {".czi"} + + +def can_read(path: Path) -> bool: + return path.is_file() and path.suffix.lower() in EXTENSIONS + + +def imread(path: Path, *, series: int = 0, level: int = 0) -> xr.DataArray: + """Read a CZI file.""" + import numpy as np + import pylibCZIrw.czi as pyczi + import xarray as xr + + with pyczi.open_czi(str(path)) as czidoc: + bb = czidoc.total_bounding_box + scenes = czidoc.scenes_bounding_rectangle + meta = ET.fromstring(czidoc.raw_metadata) + + # Determine ROI from scene or full bounding box + if scenes and series in scenes: + rect = scenes[series] + roi = (rect.x, rect.y, rect.w, rect.h) + else: + roi = (bb["X"][0], bb["Y"][0], bb["X"][1], bb["Y"][1]) + + n_t = bb["T"][1] - bb["T"][0] + n_z = bb["Z"][1] - bb["Z"][0] + n_c = bb["C"][1] - bb["C"][0] + + # Read a sample plane to detect shape/dtype/RGB + # pylibCZIrw returns (Y, X, S) where S=1 for grayscale, 3/4 for RGB + sample = czidoc.read(roi=roi, plane={"T": 0, "Z": 0, "C": 0}) + is_rgb = sample.ndim == 3 and sample.shape[-1] in (3, 4) + if not is_rgb and sample.ndim == 3 and sample.shape[-1] == 1: + sample = sample[..., 0] + + if n_t <= 1 and n_z <= 1 and n_c <= 1: + data = sample + else: + # Read all planes and stack into (T, C, Z, Y, X) + planes = np.empty((n_t, n_c, n_z, *sample.shape), dtype=sample.dtype) + for t in range(n_t): + for c in range(n_c): + for z in range(n_z): + plane = czidoc.read(roi=roi, plane={"T": t, "Z": z, "C": c}) + if not is_rgb and plane.shape[-1] == 1: + plane = plane[..., 0] + planes[t, c, z] = plane + data = planes + + # Build dims — squeeze leading singletons, always keep Y, X + dims: list[str] = [] + squeeze_axes: list[int] = [] + if n_t <= 1 and n_z <= 1 and n_c <= 1: + # Simple image — no leading dims + dims = ["Y", "X"] + if is_rgb: + dims.append("S") + else: + for i, (label, n) in enumerate([("T", n_t), ("C", n_c), ("Z", n_z)]): + if n <= 1: + squeeze_axes.append(i) + dims.append(label) + dims.extend(["Y", "X"]) + if is_rgb: + dims.append("S") + if squeeze_axes: + data = np.squeeze(data, axis=tuple(squeeze_axes)) + dims = [d for i, d in enumerate(dims) if i not in squeeze_axes] + + # Extract metadata + coords: dict[str, Any] = {} + ndv_display: dict[str, object] = {} + scales = _parse_scales(meta) + channels = _parse_channels(meta) + + for dim_label in ("Z", "Y", "X"): + scale = scales.get(dim_label) + if scale is not None and dim_label in dims: + idx = dims.index(dim_label) + n = data.shape[idx] + coords[dim_label] = [i * scale for i in range(n)] + + if channels and "C" in dims: + names = [ch["name"] for ch in channels] + coords["C"] = names + colors: dict[int, str] = {} + for i, ch in enumerate(channels): + if cmap := ch.get("cmap"): + colors[i] = cmap + if colors: + ndv_display["channel_colors"] = colors + + attrs: dict[str, object] = {} + if ndv_display: + attrs["ndv_display"] = ndv_display + + return xr.DataArray(data, dims=dims, coords=coords, attrs=attrs) + + +def _parse_scales(meta: ET.Element) -> dict[str, float]: + """Extract physical pixel sizes from CZI XML metadata.""" + scales: dict[str, float] = {} + for item in meta.iter("Distance"): + dim_id = item.get("Id") + val_el = item.find("Value") + if dim_id and val_el is not None and val_el.text: + try: + scales[dim_id] = float(val_el.text) + except ValueError: + pass + return scales + + +def _parse_channels(meta: ET.Element) -> list[dict[str, str]]: + """Extract acquisition channel names and colors.""" + channels: list[dict[str, str]] = [] + for dims_el in meta.iter("Dimensions"): + ch_el = dims_el.find("Channels") + if ch_el is None: + continue + for ch in ch_el.findall("Channel"): + if ch.get("Id") is None: + continue + info: dict[str, str] = {} + name = ch.get("Name") or ch.findtext("Name") or "" + if name: + info["name"] = name + color = ch.findtext("Color") + if color: + cmap = _hex_color_to_cmap(color) + if cmap: + info["cmap"] = cmap + channels.append(info) + break + return channels + + +def _hex_color_to_cmap(color: str) -> str | None: + """Convert #AARRGGBB hex color to a cmap name.""" + color = color.lstrip("#") + if len(color) == 8: + # ARGB format + r = int(color[2:4], 16) + g = int(color[4:6], 16) + b = int(color[6:8], 16) + elif len(color) == 6: + r = int(color[0:2], 16) + g = int(color[2:4], 16) + b = int(color[4:6], 16) + else: + return None + + if r > 200 and g < 50 and b < 50: + return "red" + if r < 50 and g > 200 and b < 50: + return "green" + if r < 50 and g < 50 and b > 200: + return "blue" + if r > 200 and g > 200 and b < 50: + return "yellow" + if r > 200 and g < 50 and b > 200: + return "magenta" + if r < 50 and g > 200 and b > 200: + return "cyan" + if r > 200 and g > 200 and b > 200: + return "gray" + return None diff --git a/src/ndv/io/_dispatch.py b/src/ndv/io/_dispatch.py new file mode 100644 index 00000000..d04388ef --- /dev/null +++ b/src/ndv/io/_dispatch.py @@ -0,0 +1,94 @@ +"""Reader registry and dispatch loop.""" + +from __future__ import annotations + +import importlib +import logging +from pathlib import Path +from textwrap import indent, wrap +from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable + +if TYPE_CHECKING: + import xarray as xr + +logger = logging.getLogger("ndv.io") + + +@runtime_checkable +class ReaderModule(Protocol): + """Protocol for reader modules.""" + + EXTENSIONS: set[str] + + def can_read(self, path: Path) -> bool: ... + def imread( + self, path: Path, *, series: int = 0, level: int = 0 + ) -> xr.DataArray: ... + + +_READER_MODULES: list[tuple[str, str]] = [ + ("ome-zarr", "ndv.io._ome_zarr"), + ("tifffile", "ndv.io._tiff"), + ("nd2", "ndv.io._nd2"), + ("readlif", "ndv.io._lif"), + ("pylibCZIrw", "ndv.io._czi"), + ("bffile", "ndv.io._bioformats"), +] + + +def _load_reader(module_name: str) -> Any: + return importlib.import_module(module_name) + + +def imread( + path: str | Path, + *, + series: int = 0, + level: int = 0, +) -> xr.DataArray: + """Read a file into an xarray.DataArray with bioimage metadata.""" + path = Path(path) + if not path.exists(): + raise FileNotFoundError(path) + errors: dict[str, Exception] = {} + + for name, mod_name in _READER_MODULES: + reader = _load_reader(mod_name) + if reader.can_read(path): + try: + result = reader.imread(path, series=series, level=level) + logger.info("Read %s with %s", path, name) + return _ensure_all_coords(result) + except ImportError as e: + errors[name] = e + except Exception as e: + errors[name] = e + + raise ValueError(_format_error_message(path, errors)) + + +def _ensure_all_coords(da: xr.DataArray) -> xr.DataArray: + """Ensure every dim has a coordinate (ndv needs them for sliders).""" + missing = { + dim: list(range(da.sizes[dim])) for dim in da.dims if dim not in da.coords + } + if missing: + da = da.assign_coords(missing) + return da + + +def _format_error_message(path: Path, errors: dict[str, Exception]) -> str: + lines = [f"\nCould not read {path}. Tried the following readers:", ""] + for key, err in errors.items(): + lines.append(f"{key}:") + wrapped = wrap(str(err), width=120) + indented = indent("\n".join(wrapped), " ") + lines.append(indented) + msg = "\n".join(lines) + + if any(isinstance(e, ImportError) for e in errors.values()): + msg += ( + "\n\nSome readers failed due to missing packages. " + "Install IO support with: pip install 'ndv[io-all]'" + ) + return msg diff --git a/src/ndv/io/_lif.py b/src/ndv/io/_lif.py new file mode 100644 index 00000000..e2e0577d --- /dev/null +++ b/src/ndv/io/_lif.py @@ -0,0 +1,90 @@ +"""LIF reader using readlif.""" + +from __future__ import annotations + +import itertools +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from collections.abc import Sequence + from pathlib import Path + + import xarray as xr + +EXTENSIONS: set[str] = {".lif"} + +# readlif internal dim index -> named dimension +_IDX_TO_DIM: dict[int, str] = {1: "X", 2: "Y", 3: "Z", 4: "T", 5: "M"} + + +def can_read(path: Path) -> bool: + return path.is_file() and path.suffix.lower() in EXTENSIONS + + +def imread(path: Path, *, series: int = 0, level: int = 0) -> xr.DataArray: + """Read a LIF file.""" + import numpy as np + import xarray as xr + from readlif.reader import LifFile + + lif = LifFile(str(path)) + images = list(lif.get_iter_image()) + if series >= len(images): + raise IndexError(f"Series {series} not found (file has {len(images)} scenes)") + img = images[series] + + display = img.display_dims # e.g. (1, 3) for XZ + dims_n = img.dims_n # e.g. {1: 128, 3: 128, 4: 20} + n_channels = img.channels + + # Separate non-display dims (ordered by index) + nondisplay = {k: v for k, v in dims_n.items() if k not in display} + nd_keys = sorted(nondisplay.keys()) + nd_sizes = [nondisplay[k] for k in nd_keys] + + # Read all planes: iterate (non-display-dims x channels) + sample = np.asarray(img.get_plane(c=0)) + plane_shape = sample.shape + + total_planes = n_channels * (max(1, int(np.prod(nd_sizes))) if nd_sizes else 1) + all_planes = np.empty((total_planes, *plane_shape), dtype=sample.dtype) + idx = 0 + + nd_ranges: list[Sequence] = [range(s) for s in nd_sizes] if nd_sizes else [()] + for nd_vals in itertools.product(*nd_ranges): + req = dict(zip(nd_keys, nd_vals, strict=False)) if nd_keys else {} + for c in range(n_channels): + plane = np.asarray(img.get_plane(c=c, requested_dims=req)) + all_planes[idx] = plane + idx += 1 + + # Build shape: (*non_display, C, display_h, display_w) + shape = (*nd_sizes, n_channels, *plane_shape) + data = all_planes.reshape(shape) + + # Build dim labels + # numpy plane shape is (dims_n[display[1]], dims_n[display[0]]) + dim_labels: list[str] = [] + for k in nd_keys: + dim_labels.append(_IDX_TO_DIM.get(k, f"dim_{k}")) + dim_labels.append("C") + dim_labels.append(_IDX_TO_DIM.get(display[1], f"dim_{display[1]}")) + dim_labels.append(_IDX_TO_DIM.get(display[0], f"dim_{display[0]}")) + + # Squeeze singleton dims + squeeze_axes = [i for i, s in enumerate(data.shape) if s == 1] + if squeeze_axes: + data = np.squeeze(data, axis=tuple(squeeze_axes)) + dim_labels = [d for i, d in enumerate(dim_labels) if i not in squeeze_axes] + + # Physical scales from readlif + coords: dict[str, Any] = {} + scale_n = img.scale_n # {internal_idx: scale_in_microns_per_px} + for internal_idx, scale in scale_n.items(): + dim_name = _IDX_TO_DIM.get(internal_idx) + if dim_name and dim_name in dim_labels: + ax = dim_labels.index(dim_name) + n = data.shape[ax] + coords[dim_name] = [i / scale for i in range(n)] + + return xr.DataArray(data, dims=dim_labels, coords=coords) diff --git a/src/ndv/io/_nd2.py b/src/ndv/io/_nd2.py new file mode 100644 index 00000000..42c73495 --- /dev/null +++ b/src/ndv/io/_nd2.py @@ -0,0 +1,70 @@ +"""ND2 reader using the nd2 library.""" + +from __future__ import annotations + +import atexit +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pathlib import Path + + import xarray as xr + +EXTENSIONS: set[str] = {".nd2"} + + +def can_read(path: Path) -> bool: + return path.is_file() and path.suffix.lower() in EXTENSIONS + + +def imread(path: Path, *, series: int = 0, level: int = 0) -> xr.DataArray: + """Read an ND2 file.""" + import nd2 + + f = nd2.ND2File(path) + atexit.register(f.close) + da = f.to_xarray(position=series, squeeze=False) + + # Extract channel colors + ndv_display: dict[str, object] = {} + try: + colors: dict[int, str] = {} + for i, ch in enumerate(f.metadata.channels): + rgb = ch.channel.colorRGB + cmap_name = _rgb_to_cmap(rgb) + if cmap_name: + colors[i] = cmap_name + if colors: + ndv_display["channel_colors"] = colors + except Exception: + pass + + if ndv_display: + da.attrs["ndv_display"] = ndv_display + + # Keep file handle alive — dask arrays in da reference it + da.attrs["_nd2_file"] = f + return da + + +def _rgb_to_cmap(rgb: int) -> str | None: + """Convert an RGB integer to a cmap name.""" + r = (rgb >> 16) & 0xFF + g = (rgb >> 8) & 0xFF + b = rgb & 0xFF + + if r > 200 and g < 50 and b < 50: + return "red" + if r < 50 and g > 200 and b < 50: + return "green" + if r < 50 and g < 50 and b > 200: + return "blue" + if r > 200 and g > 200 and b < 50: + return "yellow" + if r > 200 and g < 50 and b > 200: + return "magenta" + if r < 50 and g > 200 and b > 200: + return "cyan" + if r > 200 and g > 200 and b > 200: + return "gray" + return None diff --git a/src/ndv/io/_ome_zarr.py b/src/ndv/io/_ome_zarr.py new file mode 100644 index 00000000..5e9ea632 --- /dev/null +++ b/src/ndv/io/_ome_zarr.py @@ -0,0 +1,156 @@ +"""OME-Zarr reader using yaozarrs + tensorstore.""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from pathlib import Path + + import xarray as xr + +EXTENSIONS: set[str] = {".zarr"} +logger = logging.getLogger(__name__) + + +def can_read(path: Path) -> bool: + """Check if path is an OME-Zarr or plain Zarr dataset.""" + p = str(path) + if p.endswith(".zarr"): + return True + # directory containing zarr structure + if path.is_dir(): + return (path / ".zattrs").exists() or (path / "zarr.json").exists() + return False + + +def imread(path: Path, *, series: int = 0, level: int = 0) -> xr.DataArray: + """Read an OME-Zarr dataset.""" + import tensorstore as ts + import xarray as xr + import yaozarrs + + group = yaozarrs.open_group(str(path)) + subpath, ome_meta = _navigate_zarr(group, series=series, level=level) + + # Open the array via tensorstore for lazy access + full_path = str(path) + subpath if subpath != "/" else str(path) + store = ts.open({"driver": "zarr", "kvstore": full_path}).result() + + dims, coords, ndv_display = _extract_ome_metadata(ome_meta, series, level) + + # Fall back to generic dims if OME metadata didn't provide them + if not dims: + dims = tuple(f"dim_{i}" for i in range(store.ndim)) + + attrs: dict[str, object] = {} + if ndv_display: + attrs["ndv_display"] = ndv_display + + return xr.DataArray(store, dims=dims, coords=coords, attrs=attrs) + + +def _navigate_zarr( + group: Any, *, series: int = 0, level: int = 0 +) -> tuple[str, dict[str, Any] | None]: + """Navigate zarr group to find the target array and OME metadata.""" + try: + ome = group.ome_metadata() + except Exception: + ome = None + + # Bioformats2Raw layout: nested series groups + if ome is None: + try: + sub_group = group.open_group(str(series)) + ome = sub_group.ome_metadata() + prefix = f"/{series}" + except Exception: + prefix = "" + else: + prefix = "" + + if ome is not None: + multiscales = ome.get("multiscales", []) + if multiscales: + ms = multiscales[series] if series < len(multiscales) else multiscales[0] + datasets = ms.get("datasets", []) + if level < len(datasets): + ds_path = datasets[level].get("path", "0") + elif datasets: + ds_path = datasets[0].get("path", "0") + else: + ds_path = "0" + return f"{prefix}/{ds_path}", ome + + # No OME metadata — try to find any array + return f"{prefix}/{level}", None + + +def _extract_ome_metadata( + ome: dict[str, Any] | None, series: int, level: int +) -> tuple[tuple[str, ...], dict[str, Any], dict[str, object]]: + """Extract dims, coords, and display hints from OME metadata.""" + dims: tuple[str, ...] = () + coords: dict[str, Any] = {} + ndv_display: dict[str, object] = {} + + if ome is None: + return dims, coords, ndv_display + + multiscales = ome.get("multiscales", []) + if not multiscales: + return dims, coords, ndv_display + + ms = multiscales[series] if series < len(multiscales) else multiscales[0] + + # Extract axis names + axes = ms.get("axes", []) + if axes: + dims = tuple( + ax["name"].upper() if len(ax["name"]) == 1 else ax["name"] for ax in axes + ) + + # Extract channel colors from omero metadata + omero = ome.get("omero", {}) + if omero: + channels = omero.get("channels", []) + colors: dict[int, str] = {} + for i, ch in enumerate(channels): + color = ch.get("color") + if color: + cmap_name = _hex_color_to_cmap(color) + if cmap_name: + colors[i] = cmap_name + if colors: + ndv_display["channel_colors"] = colors + + return dims, coords, ndv_display + + +def _hex_color_to_cmap(color: str) -> str | None: + """Convert hex color string to a cmap name.""" + color = color.lstrip("#").upper() + if len(color) < 6: + return None + + r = int(color[0:2], 16) + g = int(color[2:4], 16) + b = int(color[4:6], 16) + + if r > 200 and g < 50 and b < 50: + return "red" + if r < 50 and g > 200 and b < 50: + return "green" + if r < 50 and g < 50 and b > 200: + return "blue" + if r > 200 and g > 200 and b < 50: + return "yellow" + if r > 200 and g < 50 and b > 200: + return "magenta" + if r < 50 and g > 200 and b > 200: + return "cyan" + if r > 200 and g > 200 and b > 200: + return "gray" + return None diff --git a/src/ndv/io/_tiff.py b/src/ndv/io/_tiff.py new file mode 100644 index 00000000..3962ab8b --- /dev/null +++ b/src/ndv/io/_tiff.py @@ -0,0 +1,111 @@ +"""TIFF and OME-TIFF reader using tifffile.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pathlib import Path + + import xarray as xr + +EXTENSIONS: set[str] = {".tif", ".tiff"} + + +def can_read(path: Path) -> bool: + return path.is_file() and path.suffix.lower() in EXTENSIONS + + +def imread(path: Path, *, series: int = 0, level: int = 0) -> xr.DataArray: + """Read a TIFF or OME-TIFF file.""" + import tifffile + import xarray as xr + + tf = tifffile.TiffFile(path) + s = tf.series[series] + + if level > 0 and level < len(s.levels): + s = s.levels[level] + + data = s.asarray() + dims = tuple(s.axes.replace("S", "C")) + + coords: dict[str, list[str]] = {} + attrs: dict[str, object] = {} + ndv_display: dict[str, object] = {} + + # OME metadata for physical sizes and channel info + if tf.is_ome: + ome = tf.ome_metadata + if ome is not None: + _apply_ome_metadata(ome, series, dims, coords, ndv_display) + + if ndv_display: + attrs["ndv_display"] = ndv_display + + return xr.DataArray(data, dims=dims, coords=coords, attrs=attrs) + + +def _apply_ome_metadata( + ome: str, + series: int, + dims: tuple[str, ...], + coords: dict[str, list[str]], + ndv_display: dict[str, object], +) -> None: + import xml.etree.ElementTree as ET + + root = ET.fromstring(ome) + ns = {"ome": root.tag.split("}")[0].lstrip("{")} if "}" in root.tag else {} + prefix = f"{{{ns['ome']}}}" if ns else "" + + images = root.findall(f"{prefix}Image") + if series >= len(images): + return + image = images[series] + pixels = image.find(f"{prefix}Pixels") + if pixels is None: + return + + # Channel names and colors + channels = pixels.findall(f"{prefix}Channel") + if channels and "C" in dims: + names = [] + colors: dict[int, str] = {} + for i, ch in enumerate(channels): + name = ch.get("Name", f"Ch{i}") + names.append(name) + color = ch.get("Color") + if color is not None: + cmap_name = _ome_color_to_cmap(int(color)) + if cmap_name: + colors[i] = cmap_name + coords["C"] = names + if colors: + ndv_display["channel_colors"] = colors + + +def _ome_color_to_cmap(color_int: int) -> str | None: + """Convert OME integer color (RGBA packed) to a cmap name.""" + # OME colors are signed 32-bit RGBA packed integers + color_int = color_int & 0xFFFFFFFF + r = (color_int >> 24) & 0xFF + g = (color_int >> 16) & 0xFF + b = (color_int >> 8) & 0xFF + + # Map common colors to cmap names + if r > 200 and g < 50 and b < 50: + return "red" + if r < 50 and g > 200 and b < 50: + return "green" + if r < 50 and g < 50 and b > 200: + return "blue" + if r > 200 and g > 200 and b < 50: + return "yellow" + if r > 200 and g < 50 and b > 200: + return "magenta" + if r < 50 and g > 200 and b > 200: + return "cyan" + if r > 200 and g > 200 and b > 200: + return "gray" + return None diff --git a/src/ndv/util.py b/src/ndv/util.py index 735f5994..b9c51aa1 100644 --- a/src/ndv/util.py +++ b/src/ndv/util.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING, overload +from typing import TYPE_CHECKING, Any, overload from ndv.controllers import ArrayViewer from ndv.views._app import run_app