From b88140b93d7c4ade78055413a6dfe83bedbe023b Mon Sep 17 00:00:00 2001 From: John Chase Date: Sat, 25 Apr 2026 11:13:52 -0700 Subject: [PATCH 1/2] Adds types, improves terminal formatting --- src/adagio/cli/dynamic.py | 117 +++++++++++++++++++++++++--- src/adagio/describe.py | 22 +++++- src/adagio/model/ast.py | 3 +- tests/test_pipeline_descriptions.py | 22 +++++- tests/test_pipeline_show.py | 47 +++++++++++ 5 files changed, 196 insertions(+), 15 deletions(-) diff --git a/src/adagio/cli/dynamic.py b/src/adagio/cli/dynamic.py index 3cc81b2..462acc9 100644 --- a/src/adagio/cli/dynamic.py +++ b/src/adagio/cli/dynamic.py @@ -1,6 +1,7 @@ import inspect import math import re +import textwrap import types from pathlib import Path from typing import Any, Annotated, Callable, Union, get_args, get_origin @@ -46,6 +47,10 @@ def __call__(self, console: Any, options: Any, panel: Any) -> None: console.print(PanelSpec().build(RichGroup(*renderables), title=panel.title)) +_TYPE_STYLE = "bold yellow" +_SEMANTIC_TYPE_STYLE = "bold #84ad50" + + def _entry_key(entry: Any) -> str: options = entry.all_options if hasattr(entry, "all_options") else () return next((name for name in options if name.startswith("--")), "") @@ -75,7 +80,7 @@ def _pipeline_type_label(type_hint: Any) -> str: def _display_type_label(*, spec_type: str | None, type_hint: Any, is_input: bool) -> str: if is_input: - return "PATH" + return _path_type_label(spec_type) if spec_type: compact = _compact_type_text(spec_type) @@ -85,6 +90,13 @@ def _display_type_label(*, spec_type: str | None, type_hint: Any, is_input: bool return _pipeline_type_label(type_hint) +def _path_type_label(spec_type: str | None) -> str: + cleaned = (spec_type or "").strip() + if not cleaned: + return "PATH" + return f"PATH\n{cleaned}" + + def _output_path_help(description: str | None) -> str: cleaned = (description or "").strip() if cleaned: @@ -95,10 +107,25 @@ def _output_path_help(description: str | None) -> str: def _render_pipeline_type( entry: Any, entry_metadata: dict[str, dict[str, Any]], width: int ) -> Any: + label = entry_metadata.get(_entry_key(entry), {}).get("type_label", "TEXT") + return _render_type_text(label, width) + + +def _render_type_text(label: str, width: int) -> Any: from rich.text import Text - label = entry_metadata.get(_entry_key(entry), {}).get("type_label", "TEXT") - return Text(_wrap_type_label(label, width), style="bold yellow") + wrapped = _wrap_type_label(label, width) + if not label.startswith("PATH\n"): + return Text(wrapped, style=_TYPE_STYLE) + + rendered = Text() + lines = wrapped.split("\n") + for index, line in enumerate(lines): + if index: + rendered.append("\n") + style = _TYPE_STYLE if index == 0 and line == "PATH" else _SEMANTIC_TYPE_STYLE + rendered.append(line, style=style) + return rendered def _compact_type_text(type_text: str) -> str: @@ -121,12 +148,30 @@ def _compact_type_text(type_text: str) -> str: def _wrap_type_label(label: str, width: int) -> str: + return "\n".join( + line + for raw_line in label.splitlines() + for line in _wrap_type_label_line(raw_line, width) + ) + + +def _wrap_type_label_line(label: str, width: int) -> list[str]: + if len(label) <= width: + return [label] + if label.startswith("[") and label.endswith("]"): + return _wrap_choice_label(label, width) + if " | " in label: + return _wrap_union_type_label(label, width) + return _wrap_long_type_label(label, width) + + +def _wrap_choice_label(label: str, width: int) -> list[str]: if len(label) <= width or not (label.startswith("[") and label.endswith("]")): - return label + return [label] choices = [choice for choice in label[1:-1].split("|") if choice] if not choices: - return label + return [label] lines: list[str] = [] current = "[" @@ -146,7 +191,59 @@ def _wrap_type_label(label: str, width: int) -> str: if not current.endswith("]"): current += "]" lines.append(current) - return "\n".join(lines) + return lines + + +def _wrap_union_type_label(label: str, width: int) -> list[str]: + members = [member for member in label.split(" | ") if member] + if not members: + return [label] + + lines: list[str] = [] + current = "" + for index, member in enumerate(members): + part = member if index == 0 else f" | {member}" + if not current: + if len(part) <= width: + current = part + else: + lines.extend(_wrap_long_type_label(part, width)) + elif len(current) + len(part) <= width: + current += part + else: + lines.append(current) + if len(part) <= width: + current = part + else: + lines.extend(_wrap_long_type_label(part, width)) + current = "" + + if current: + lines.append(current) + return lines + + +def _wrap_long_type_label(label: str, width: int) -> list[str]: + lines = textwrap.wrap( + label, + width=width, + subsequent_indent=" ", + break_long_words=False, + break_on_hyphens=False, + ) + if lines and all(len(line) <= width for line in lines): + return lines + return textwrap.wrap( + label, + width=width, + subsequent_indent=" ", + break_long_words=True, + break_on_hyphens=False, + ) or [label] + + +def _type_label_display_width(label: str) -> int: + return max((len(line) for line in label.splitlines()), default=0) def _render_pipeline_description( @@ -191,10 +288,12 @@ def _get_pipeline_parameter_columns( 8, min( max( - len(entry_metadata.get(_entry_key(entry), {}).get("type_label", "TEXT")) + _type_label_display_width( + entry_metadata.get(_entry_key(entry), {}).get("type_label", "TEXT") + ) for entry in entries ), - max(22, min(34, math.ceil(console.width * 0.3))), + max(28, min(70, math.ceil(console.width * 0.35))), ), ) name_column = ColumnSpec( @@ -565,7 +664,7 @@ def add_param_spec(spec: ParamSpec) -> None: output_bindings.append((ident, original)) opt = dynamic_opt(original, ParamType.OUTPUT) entry_metadata[opt] = { - "type_label": "PATH", + "type_label": _path_type_label(spec.type), "default": None, "required": False, } diff --git a/src/adagio/describe.py b/src/adagio/describe.py index 484e920..c38da68 100644 --- a/src/adagio/describe.py +++ b/src/adagio/describe.py @@ -6,7 +6,7 @@ from rich.panel import Panel from rich.text import Text -from .cli.dynamic import _compact_type_text +from .cli.dynamic import _compact_type_text, _wrap_type_label from .executors.common import plan_execution_order from .model.pipeline import AdagioPipeline from .model.task import ( @@ -25,6 +25,10 @@ class _DisplayRef: description: str | None = None +_ENTRY_INDENT = " " +_PIPELINE_SHOW_TYPE_WIDTH = 72 + + def render_pipeline_text(pipeline: AdagioPipeline) -> Text | Group: available_ids = { input_def.id: _DisplayRef( @@ -232,13 +236,25 @@ def _append_entry_line( rendered.append(":", style="cyan") if type_label: rendered.append(" ") - rendered.append(type_label, style="bold yellow") + wrapped_type = _wrap_type_label(type_label, _PIPELINE_SHOW_TYPE_WIDTH) + type_lines = wrapped_type.splitlines() + rendered.append(type_lines[0], style="bold yellow") + if len(type_lines) > 1: + for line in type_lines[1:]: + rendered.append("\n") + rendered.append(_ENTRY_INDENT) + rendered.append(line, style="bold yellow") + if value_text: + rendered.append("\n") + rendered.append(_ENTRY_INDENT) + rendered.append(value_text) + value_text = None if value_text: rendered.append(" ") rendered.append(value_text) rendered.append("\n") if description: - rendered.append(" ") + rendered.append(_ENTRY_INDENT) rendered.append(description, style="dim") rendered.append("\n") diff --git a/src/adagio/model/ast.py b/src/adagio/model/ast.py index d41b099..e2d50da 100644 --- a/src/adagio/model/ast.py +++ b/src/adagio/model/ast.py @@ -39,7 +39,7 @@ class TypeASTPredicateChoices(TypeASTPredicateBase): class TypeASTPredicateRange(TypeASTPredicateBase): name: t.Literal['Range'] - range: tuple[int, int] | tuple[float, float] + range: tuple[int | float | None, int | float | None] inclusive: tuple[bool, bool] @@ -52,4 +52,3 @@ class TypeASTPredicateProperties(TypeASTPredicateBase): t.Union[TypeASTUnion, TypeASTIntersection, TypeASTExpression], Field(discriminator='type') ] - diff --git a/tests/test_pipeline_descriptions.py b/tests/test_pipeline_descriptions.py index ceaec18..b362f48 100644 --- a/tests/test_pipeline_descriptions.py +++ b/tests/test_pipeline_descriptions.py @@ -6,6 +6,7 @@ _compact_type_text, _display_type_label, _pipeline_type_label, + _render_type_text, _wrap_type_label, build_dynamic_run, ) @@ -155,6 +156,25 @@ def test_long_choice_labels_wrap_on_pipes(self) -> None: self.assertTrue(wrapped.endswith("]")) self.assertIn("\n |", wrapped) + def test_long_semantic_union_labels_wrap_on_pipes(self) -> None: + wrapped = _wrap_type_label( + "PATH\n" + "SampleData[Kraken2Report % Properties('reads', 'contigs', 'mags')]¹ | " + "FeatureData[Kraken2Report % Properties('mags')]²", + 44, + ) + + self.assertTrue(wrapped.startswith("PATH\n")) + self.assertIn("\n | FeatureData[Kraken2Report", wrapped) + self.assertTrue(all(len(line) <= 44 for line in wrapped.splitlines())) + + def test_semantic_type_lines_render_green_after_path(self) -> None: + rendered = _render_type_text("PATH\nFeatureTable[Frequency]", 44) + + self.assertEqual(rendered.plain, "PATH\nFeatureTable[Frequency]") + self.assertEqual(rendered.spans[0].style, "bold yellow") + self.assertEqual(rendered.spans[1].style, "bold #84ad50") + def test_pipeline_type_labels_use_general_cli_types(self) -> None: self.assertEqual(_pipeline_type_label(int), "INTEGER") self.assertEqual(_pipeline_type_label(float), "NUMBER") @@ -166,7 +186,7 @@ def test_display_type_label_prefers_choices_and_path(self) -> None: _display_type_label( spec_type="FeatureTable[Frequency]", type_hint=str, is_input=True ), - "PATH", + "PATH\nFeatureTable[Frequency]", ) self.assertEqual( _display_type_label( diff --git a/tests/test_pipeline_show.py b/tests/test_pipeline_show.py index 4324f9c..be7829c 100644 --- a/tests/test_pipeline_show.py +++ b/tests/test_pipeline_show.py @@ -20,6 +20,19 @@ "fields": [], } +OPEN_ENDED_RANGE_AST = { + "type": "expression", + "builtin": True, + "name": "Int", + "predicate": { + "type": "predicate", + "name": "Range", + "range": [1, None], + "inclusive": [True, False], + }, + "fields": [], +} + def _sample_pipeline_dict() -> dict: return { @@ -225,6 +238,40 @@ def test_pipeline_show_cli_prints_summary(self) -> None: self.assertIn('barcodes: (MetadataColumn[Categorical]) pipeline input "barcodes"', result.stdout) self.assertIn('table (FeatureTable[Frequency])', result.stdout) + def test_pipeline_show_accepts_open_ended_range_predicates(self) -> None: + pipeline_data = _sample_pipeline_dict() + pipeline_data["signature"]["parameters"].append( + { + "id": "param-sampling-depth", + "name": "sampling_depth", + "type": "Int % Range(1, None)", + "ast": OPEN_ENDED_RANGE_AST, + "required": True, + "description": "Sampling depth.", + } + ) + + pipeline = AdagioPipeline.model_validate(pipeline_data) + rendered = _render_plain(render_pipeline_text(pipeline)) + + self.assertIn("dada2.denoise_single", rendered) + + def test_pipeline_show_wraps_long_semantic_types(self) -> None: + pipeline_data = _sample_pipeline_dict() + pipeline_data["signature"]["inputs"][0]["type"] = ( + "SampleData[Kraken2Report % Properties('reads', 'contigs', 'mags')]¹ | " + "FeatureData[Kraken2Report % Properties('mags')]²" + ) + pipeline = AdagioPipeline.model_validate(pipeline_data) + + rendered = _render_plain(render_pipeline_text(pipeline)) + + self.assertIn( + "seqs: (SampleData[Kraken2Report % Properties", rendered + ) + self.assertIn("| FeatureData[Kraken2Report", rendered) + self.assertIn('pipeline input "seqs"', rendered) + def test_render_pipeline_text_displays_collection_inputs(self) -> None: pipeline = AdagioPipeline.model_validate(_collection_pipeline_dict()) From a0d3612890b0f06dc5a1b0964aa1a5c51c6714a0 Mon Sep 17 00:00:00 2001 From: John Chase Date: Sat, 25 Apr 2026 11:44:18 -0700 Subject: [PATCH 2/2] Cleans up edge cases --- src/adagio/cli/dynamic.py | 163 ++-------------------------- src/adagio/describe.py | 16 +-- src/adagio/model/ast.py | 17 ++- src/adagio/type_format.py | 149 +++++++++++++++++++++++++ tests/test_pipeline_descriptions.py | 41 +++++-- tests/test_pipeline_show.py | 25 ++++- 6 files changed, 240 insertions(+), 171 deletions(-) create mode 100644 src/adagio/type_format.py diff --git a/src/adagio/cli/dynamic.py b/src/adagio/cli/dynamic.py index 462acc9..65437d1 100644 --- a/src/adagio/cli/dynamic.py +++ b/src/adagio/cli/dynamic.py @@ -1,7 +1,6 @@ import inspect import math import re -import textwrap import types from pathlib import Path from typing import Any, Annotated, Callable, Union, get_args, get_origin @@ -16,6 +15,12 @@ CACHE_DIR_HELP, REUSE_HELP, ) +from ..type_format import ( + compact_type_text, + path_type_label, + render_type_text, + type_label_display_width, +) from .args import ParamType, ShowParamsMode, dynamic_opt, to_identifier @@ -47,10 +52,6 @@ def __call__(self, console: Any, options: Any, panel: Any) -> None: console.print(PanelSpec().build(RichGroup(*renderables), title=panel.title)) -_TYPE_STYLE = "bold yellow" -_SEMANTIC_TYPE_STYLE = "bold #84ad50" - - def _entry_key(entry: Any) -> str: options = entry.all_options if hasattr(entry, "all_options") else () return next((name for name in options if name.startswith("--")), "") @@ -80,23 +81,16 @@ def _pipeline_type_label(type_hint: Any) -> str: def _display_type_label(*, spec_type: str | None, type_hint: Any, is_input: bool) -> str: if is_input: - return _path_type_label(spec_type) + return path_type_label(spec_type) if spec_type: - compact = _compact_type_text(spec_type) + compact = compact_type_text(spec_type) if compact.startswith("["): return compact return _pipeline_type_label(type_hint) -def _path_type_label(spec_type: str | None) -> str: - cleaned = (spec_type or "").strip() - if not cleaned: - return "PATH" - return f"PATH\n{cleaned}" - - def _output_path_help(description: str | None) -> str: cleaned = (description or "").strip() if cleaned: @@ -108,142 +102,7 @@ def _render_pipeline_type( entry: Any, entry_metadata: dict[str, dict[str, Any]], width: int ) -> Any: label = entry_metadata.get(_entry_key(entry), {}).get("type_label", "TEXT") - return _render_type_text(label, width) - - -def _render_type_text(label: str, width: int) -> Any: - from rich.text import Text - - wrapped = _wrap_type_label(label, width) - if not label.startswith("PATH\n"): - return Text(wrapped, style=_TYPE_STYLE) - - rendered = Text() - lines = wrapped.split("\n") - for index, line in enumerate(lines): - if index: - rendered.append("\n") - style = _TYPE_STYLE if index == 0 and line == "PATH" else _SEMANTIC_TYPE_STYLE - rendered.append(line, style=style) - return rendered - - -def _compact_type_text(type_text: str) -> str: - cleaned = type_text.strip() - if "Choices(" not in cleaned: - return f"({cleaned})" - - match = re.search(r"Choices\((.*)\)", cleaned) - if match is None: - return f"({cleaned})" - - choices = [ - choice.strip().strip("'\"") - for choice in match.group(1).split(",") - if choice.strip() - ] - if not choices: - return f"({cleaned})" - return "[" + "|".join(choices) + "]" - - -def _wrap_type_label(label: str, width: int) -> str: - return "\n".join( - line - for raw_line in label.splitlines() - for line in _wrap_type_label_line(raw_line, width) - ) - - -def _wrap_type_label_line(label: str, width: int) -> list[str]: - if len(label) <= width: - return [label] - if label.startswith("[") and label.endswith("]"): - return _wrap_choice_label(label, width) - if " | " in label: - return _wrap_union_type_label(label, width) - return _wrap_long_type_label(label, width) - - -def _wrap_choice_label(label: str, width: int) -> list[str]: - if len(label) <= width or not (label.startswith("[") and label.endswith("]")): - return [label] - - choices = [choice for choice in label[1:-1].split("|") if choice] - if not choices: - return [label] - - lines: list[str] = [] - current = "[" - - for index, choice in enumerate(choices): - is_last = index == len(choices) - 1 - separator = "" if current in ("[", " |") else "|" - suffix = "]" if is_last else "" - candidate = current + separator + choice + suffix - - if len(candidate) <= width or current in ("[", " |"): - current = candidate - else: - lines.append(current) - current = " |" + choice + suffix - - if not current.endswith("]"): - current += "]" - lines.append(current) - return lines - - -def _wrap_union_type_label(label: str, width: int) -> list[str]: - members = [member for member in label.split(" | ") if member] - if not members: - return [label] - - lines: list[str] = [] - current = "" - for index, member in enumerate(members): - part = member if index == 0 else f" | {member}" - if not current: - if len(part) <= width: - current = part - else: - lines.extend(_wrap_long_type_label(part, width)) - elif len(current) + len(part) <= width: - current += part - else: - lines.append(current) - if len(part) <= width: - current = part - else: - lines.extend(_wrap_long_type_label(part, width)) - current = "" - - if current: - lines.append(current) - return lines - - -def _wrap_long_type_label(label: str, width: int) -> list[str]: - lines = textwrap.wrap( - label, - width=width, - subsequent_indent=" ", - break_long_words=False, - break_on_hyphens=False, - ) - if lines and all(len(line) <= width for line in lines): - return lines - return textwrap.wrap( - label, - width=width, - subsequent_indent=" ", - break_long_words=True, - break_on_hyphens=False, - ) or [label] - - -def _type_label_display_width(label: str) -> int: - return max((len(line) for line in label.splitlines()), default=0) + return render_type_text(label, width) def _render_pipeline_description( @@ -288,7 +147,7 @@ def _get_pipeline_parameter_columns( 8, min( max( - _type_label_display_width( + type_label_display_width( entry_metadata.get(_entry_key(entry), {}).get("type_label", "TEXT") ) for entry in entries @@ -664,7 +523,7 @@ def add_param_spec(spec: ParamSpec) -> None: output_bindings.append((ident, original)) opt = dynamic_opt(original, ParamType.OUTPUT) entry_metadata[opt] = { - "type_label": _path_type_label(spec.type), + "type_label": path_type_label(spec.type), "default": None, "required": False, } diff --git a/src/adagio/describe.py b/src/adagio/describe.py index c38da68..dc3ab85 100644 --- a/src/adagio/describe.py +++ b/src/adagio/describe.py @@ -6,7 +6,6 @@ from rich.panel import Panel from rich.text import Text -from .cli.dynamic import _compact_type_text, _wrap_type_label from .executors.common import plan_execution_order from .model.pipeline import AdagioPipeline from .model.task import ( @@ -16,6 +15,7 @@ PromotedVal, RootInputTask, ) +from .type_format import TYPE_STYLE, compact_type_text, wrap_type_label @dataclass(frozen=True) @@ -26,6 +26,7 @@ class _DisplayRef: _ENTRY_INDENT = " " +# Fixed so pipeline-show output remains stable and easy to copy between terminals. _PIPELINE_SHOW_TYPE_WIDTH = 72 @@ -234,22 +235,23 @@ def _append_entry_line( rendered.append(name, style="cyan") if value_text is not None: rendered.append(":", style="cyan") + value_rendered = False if type_label: rendered.append(" ") - wrapped_type = _wrap_type_label(type_label, _PIPELINE_SHOW_TYPE_WIDTH) + wrapped_type = wrap_type_label(type_label, _PIPELINE_SHOW_TYPE_WIDTH) type_lines = wrapped_type.splitlines() - rendered.append(type_lines[0], style="bold yellow") + rendered.append(type_lines[0], style=TYPE_STYLE) if len(type_lines) > 1: for line in type_lines[1:]: rendered.append("\n") rendered.append(_ENTRY_INDENT) - rendered.append(line, style="bold yellow") + rendered.append(line, style=TYPE_STYLE) if value_text: rendered.append("\n") rendered.append(_ENTRY_INDENT) rendered.append(value_text) - value_text = None - if value_text: + value_rendered = True + if value_text and not value_rendered: rendered.append(" ") rendered.append(value_text) rendered.append("\n") @@ -362,7 +364,7 @@ def _format_spec_type(type_text: str | None) -> str | None: cleaned = (type_text or "").strip() if not cleaned: return None - return _compact_type_text(cleaned) + return compact_type_text(cleaned) def _clean_description(description: str | None) -> str | None: diff --git a/src/adagio/model/ast.py b/src/adagio/model/ast.py index e2d50da..e50a29b 100644 --- a/src/adagio/model/ast.py +++ b/src/adagio/model/ast.py @@ -1,5 +1,5 @@ import typing as t -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, field_validator class TypeASTUnion(BaseModel): @@ -39,9 +39,22 @@ class TypeASTPredicateChoices(TypeASTPredicateBase): class TypeASTPredicateRange(TypeASTPredicateBase): name: t.Literal['Range'] - range: tuple[int | float | None, int | float | None] + range: tuple[int | None, int | None] | tuple[float | None, float | None] inclusive: tuple[bool, bool] + @field_validator('range', mode='before') + @classmethod + def validate_range_bounds(cls, value): + if not isinstance(value, (list, tuple)) or len(value) != 2: + return value + lower, upper = value + if lower is None and upper is None: + raise ValueError('Range must include at least one bound.') + bounds = [bound for bound in (lower, upper) if bound is not None] + if len(bounds) == 2 and type(bounds[0]) is not type(bounds[1]): + raise ValueError('Range bounds must use the same numeric type.') + return value + class TypeASTPredicateProperties(TypeASTPredicateBase): name: t.Literal['Properties'] diff --git a/src/adagio/type_format.py b/src/adagio/type_format.py new file mode 100644 index 0000000..e3dcf82 --- /dev/null +++ b/src/adagio/type_format.py @@ -0,0 +1,149 @@ +import re +import textwrap + +from rich.text import Text + + +TYPE_STYLE = "bold yellow" +SEMANTIC_TYPE_STYLE = "bold #84ad50" + + +def compact_type_text(type_text: str) -> str: + cleaned = type_text.strip() + if "Choices(" not in cleaned: + return f"({cleaned})" + + match = re.search(r"Choices\((.*)\)", cleaned) + if match is None: + return f"({cleaned})" + + choices = [ + choice.strip().strip("'\"") + for choice in match.group(1).split(",") + if choice.strip() + ] + if not choices: + return f"({cleaned})" + return "[" + "|".join(choices) + "]" + + +def path_type_label(spec_type: str | None) -> str: + cleaned = (spec_type or "").strip() + if not cleaned: + return "PATH" + return f"PATH\n{cleaned}" + + +def render_type_text(label: str, width: int) -> Text: + wrapped = wrap_type_label(label, width) + lines = wrapped.split("\n") + has_semantic_path_type = label.startswith("PATH\n") + rendered = Text() + for index, line in enumerate(lines): + if index: + rendered.append("\n") + style = ( + SEMANTIC_TYPE_STYLE + if has_semantic_path_type and index > 0 + else TYPE_STYLE + ) + rendered.append(line, style=style) + return rendered + + +def wrap_type_label(label: str, width: int) -> str: + return "\n".join( + line + for raw_line in label.splitlines() + for line in _wrap_type_label_line(raw_line, width) + ) + + +def type_label_display_width(label: str) -> int: + return max((len(line) for line in label.splitlines()), default=0) + + +def _wrap_type_label_line(label: str, width: int) -> list[str]: + if len(label) <= width: + return [label] + if label.startswith("[") and label.endswith("]"): + return _wrap_choice_label(label, width) + if " | " in label: + return _wrap_union_type_label(label, width) + return _wrap_long_type_label(label, width) + + +def _wrap_choice_label(label: str, width: int) -> list[str]: + choices = [choice for choice in label[1:-1].split("|") if choice] + if not choices: + return [label] + + lines: list[str] = [] + current = "[" + + for index, choice in enumerate(choices): + is_last = index == len(choices) - 1 + separator = "" if current in ("[", " |") else "|" + suffix = "]" if is_last else "" + candidate = current + separator + choice + suffix + + if len(candidate) <= width or current in ("[", " |"): + current = candidate + else: + lines.append(current) + current = " |" + choice + suffix + + if not current.endswith("]"): + current += "]" + lines.append(current) + return lines + + +def _wrap_union_type_label(label: str, width: int) -> list[str]: + members = [member for member in label.split(" | ") if member] + if not members: + return [label] + + lines: list[str] = [] + current = "" + for index, member in enumerate(members): + has_next = index < len(members) - 1 + candidate = f"{current} | {member}" if current else member + if current and len(candidate) <= width: + current = candidate + continue + if current: + _append_union_line(lines, current, width) + current = "" + if len(member) <= width: + current = member + else: + lines.extend(_wrap_long_type_label(member, width)) + if has_next: + _append_separator_to_last_line(lines, width) + if current: + lines.append(current) + return lines + + +def _append_union_line(lines: list[str], line: str, width: int) -> None: + if len(line) + 2 <= width: + lines.append(f"{line} |") + else: + lines.append(line) + + +def _append_separator_to_last_line(lines: list[str], width: int) -> None: + if lines and len(lines[-1]) + 2 <= width: + lines[-1] = f"{lines[-1]} |" + + +def _wrap_long_type_label(label: str, width: int) -> list[str]: + break_long_words = any(len(token) > width for token in label.split()) + return textwrap.wrap( + label, + width=width, + subsequent_indent=" ", + break_long_words=break_long_words, + break_on_hyphens=False, + ) or [label] diff --git a/tests/test_pipeline_descriptions.py b/tests/test_pipeline_descriptions.py index b362f48..a14dba2 100644 --- a/tests/test_pipeline_descriptions.py +++ b/tests/test_pipeline_descriptions.py @@ -3,14 +3,16 @@ from adagio.app.parsers.pipeline import Input, Parameter, parse_inputs, parse_parameters from adagio.cli.dynamic import ( - _compact_type_text, _display_type_label, _pipeline_type_label, - _render_type_text, - _wrap_type_label, build_dynamic_run, ) from adagio.model.pipeline import AdagioPipeline +from adagio.type_format import ( + compact_type_text, + render_type_text, + wrap_type_label, +) class PipelineDescriptionTests(unittest.TestCase): @@ -137,18 +139,18 @@ def test_dynamic_run_help_includes_descriptions(self) -> None: self.assertNotIn("Pipeline parameter:", param_help) def test_choices_are_rendered_compactly(self) -> None: - compact = _compact_type_text( + compact = compact_type_text( "Str % Choices('ace', 'berger_parker_d', 'brillouin_d')" ) self.assertEqual(compact, "[ace|berger_parker_d|brillouin_d]") - compact_unquoted = _compact_type_text( + compact_unquoted = compact_type_text( "Str % Choices(ace, berger_parker_d, brillouin_d)" ) self.assertEqual(compact_unquoted, "[ace|berger_parker_d|brillouin_d]") def test_long_choice_labels_wrap_on_pipes(self) -> None: - wrapped = _wrap_type_label( + wrapped = wrap_type_label( "[ace|berger_parker_d|brillouin_d|chao1|dominance]", 22 ) self.assertIn("\n", wrapped) @@ -157,7 +159,7 @@ def test_long_choice_labels_wrap_on_pipes(self) -> None: self.assertIn("\n |", wrapped) def test_long_semantic_union_labels_wrap_on_pipes(self) -> None: - wrapped = _wrap_type_label( + wrapped = wrap_type_label( "PATH\n" "SampleData[Kraken2Report % Properties('reads', 'contigs', 'mags')]¹ | " "FeatureData[Kraken2Report % Properties('mags')]²", @@ -165,16 +167,37 @@ def test_long_semantic_union_labels_wrap_on_pipes(self) -> None: ) self.assertTrue(wrapped.startswith("PATH\n")) - self.assertIn("\n | FeatureData[Kraken2Report", wrapped) + self.assertIn("\nFeatureData[Kraken2Report", wrapped) self.assertTrue(all(len(line) <= 44 for line in wrapped.splitlines())) + def test_union_wrap_does_not_lead_with_pipe_after_long_first_member(self) -> None: + wrapped = wrap_type_label( + "VeryLongSemanticTypeNameWithoutBreaks | ShortType", + 12, + ) + + self.assertEqual(wrapped.splitlines()[-1], "ShortType") + self.assertNotIn("\n | ShortType", wrapped) + def test_semantic_type_lines_render_green_after_path(self) -> None: - rendered = _render_type_text("PATH\nFeatureTable[Frequency]", 44) + rendered = render_type_text("PATH\nFeatureTable[Frequency]", 44) self.assertEqual(rendered.plain, "PATH\nFeatureTable[Frequency]") self.assertEqual(rendered.spans[0].style, "bold yellow") self.assertEqual(rendered.spans[1].style, "bold #84ad50") + def test_wrapped_semantic_type_lines_render_green_after_path(self) -> None: + rendered = render_type_text( + "PATH\nSampleData[Kraken2Report % Properties('reads', 'contigs')]", + 30, + ) + + self.assertTrue(rendered.plain.startswith("PATH\n")) + self.assertGreater(len(rendered.spans), 2) + self.assertTrue( + all(span.style == "bold #84ad50" for span in rendered.spans[1:]) + ) + def test_pipeline_type_labels_use_general_cli_types(self) -> None: self.assertEqual(_pipeline_type_label(int), "INTEGER") self.assertEqual(_pipeline_type_label(float), "NUMBER") diff --git a/tests/test_pipeline_show.py b/tests/test_pipeline_show.py index be7829c..f724d9e 100644 --- a/tests/test_pipeline_show.py +++ b/tests/test_pipeline_show.py @@ -256,6 +256,29 @@ def test_pipeline_show_accepts_open_ended_range_predicates(self) -> None: self.assertIn("dada2.denoise_single", rendered) + def test_pipeline_show_rejects_invalid_range_predicates(self) -> None: + pipeline_data = _sample_pipeline_dict() + invalid_range_ast = { + **OPEN_ENDED_RANGE_AST, + "predicate": { + **OPEN_ENDED_RANGE_AST["predicate"], + "range": [1, 2.5], + }, + } + pipeline_data["signature"]["parameters"].append( + { + "id": "param-invalid-range", + "name": "invalid_range", + "type": "Int % Range(1, 2.5)", + "ast": invalid_range_ast, + "required": True, + "description": "Invalid range.", + } + ) + + with self.assertRaises(ValueError): + AdagioPipeline.model_validate(pipeline_data) + def test_pipeline_show_wraps_long_semantic_types(self) -> None: pipeline_data = _sample_pipeline_dict() pipeline_data["signature"]["inputs"][0]["type"] = ( @@ -269,7 +292,7 @@ def test_pipeline_show_wraps_long_semantic_types(self) -> None: self.assertIn( "seqs: (SampleData[Kraken2Report % Properties", rendered ) - self.assertIn("| FeatureData[Kraken2Report", rendered) + self.assertIn("FeatureData[Kraken2Report", rendered) self.assertIn('pipeline input "seqs"', rendered) def test_render_pipeline_text_displays_collection_inputs(self) -> None: