diff --git a/src/adagio/cli/dynamic.py b/src/adagio/cli/dynamic.py index 3cc81b2..65437d1 100644 --- a/src/adagio/cli/dynamic.py +++ b/src/adagio/cli/dynamic.py @@ -15,6 +15,12 @@ CACHE_DIR_HELP, REUSE_HELP, ) +from ..type_format import ( + compact_type_text, + path_type_label, + render_type_text, + type_label_display_width, +) from .args import ParamType, ShowParamsMode, dynamic_opt, to_identifier @@ -75,10 +81,10 @@ def _pipeline_type_label(type_hint: Any) -> str: def _display_type_label(*, spec_type: str | None, type_hint: Any, is_input: bool) -> str: if is_input: - return "PATH" + return path_type_label(spec_type) if spec_type: - compact = _compact_type_text(spec_type) + compact = compact_type_text(spec_type) if compact.startswith("["): return compact @@ -95,58 +101,8 @@ def _output_path_help(description: str | None) -> str: def _render_pipeline_type( entry: Any, entry_metadata: dict[str, dict[str, Any]], width: int ) -> Any: - from rich.text import Text - label = entry_metadata.get(_entry_key(entry), {}).get("type_label", "TEXT") - return Text(_wrap_type_label(label, width), style="bold yellow") - - -def _compact_type_text(type_text: str) -> str: - cleaned = type_text.strip() - if "Choices(" not in cleaned: - return f"({cleaned})" - - match = re.search(r"Choices\((.*)\)", cleaned) - if match is None: - return f"({cleaned})" - - choices = [ - choice.strip().strip("'\"") - for choice in match.group(1).split(",") - if choice.strip() - ] - if not choices: - return f"({cleaned})" - return "[" + "|".join(choices) + "]" - - -def _wrap_type_label(label: str, width: int) -> str: - if len(label) <= width or not (label.startswith("[") and label.endswith("]")): - return label - - choices = [choice for choice in label[1:-1].split("|") if choice] - if not choices: - return label - - lines: list[str] = [] - current = "[" - - for index, choice in enumerate(choices): - is_last = index == len(choices) - 1 - separator = "" if current in ("[", " |") else "|" - suffix = "]" if is_last else "" - candidate = current + separator + choice + suffix - - if len(candidate) <= width or current in ("[", " |"): - current = candidate - else: - lines.append(current) - current = " |" + choice + suffix - - if not current.endswith("]"): - current += "]" - lines.append(current) - return "\n".join(lines) + return render_type_text(label, width) def _render_pipeline_description( @@ -191,10 +147,12 @@ def _get_pipeline_parameter_columns( 8, min( max( - len(entry_metadata.get(_entry_key(entry), {}).get("type_label", "TEXT")) + type_label_display_width( + entry_metadata.get(_entry_key(entry), {}).get("type_label", "TEXT") + ) for entry in entries ), - max(22, min(34, math.ceil(console.width * 0.3))), + max(28, min(70, math.ceil(console.width * 0.35))), ), ) name_column = ColumnSpec( @@ -565,7 +523,7 @@ def add_param_spec(spec: ParamSpec) -> None: output_bindings.append((ident, original)) opt = dynamic_opt(original, ParamType.OUTPUT) entry_metadata[opt] = { - "type_label": "PATH", + "type_label": path_type_label(spec.type), "default": None, "required": False, } diff --git a/src/adagio/describe.py b/src/adagio/describe.py index 484e920..dc3ab85 100644 --- a/src/adagio/describe.py +++ b/src/adagio/describe.py @@ -6,7 +6,6 @@ from rich.panel import Panel from rich.text import Text -from .cli.dynamic import _compact_type_text from .executors.common import plan_execution_order from .model.pipeline import AdagioPipeline from .model.task import ( @@ -16,6 +15,7 @@ PromotedVal, RootInputTask, ) +from .type_format import TYPE_STYLE, compact_type_text, wrap_type_label @dataclass(frozen=True) @@ -25,6 +25,11 @@ class _DisplayRef: description: str | None = None +_ENTRY_INDENT = " " +# Fixed so pipeline-show output remains stable and easy to copy between terminals. +_PIPELINE_SHOW_TYPE_WIDTH = 72 + + def render_pipeline_text(pipeline: AdagioPipeline) -> Text | Group: available_ids = { input_def.id: _DisplayRef( @@ -230,15 +235,28 @@ def _append_entry_line( rendered.append(name, style="cyan") if value_text is not None: rendered.append(":", style="cyan") + value_rendered = False if type_label: rendered.append(" ") - rendered.append(type_label, style="bold yellow") - if value_text: + wrapped_type = wrap_type_label(type_label, _PIPELINE_SHOW_TYPE_WIDTH) + type_lines = wrapped_type.splitlines() + rendered.append(type_lines[0], style=TYPE_STYLE) + if len(type_lines) > 1: + for line in type_lines[1:]: + rendered.append("\n") + rendered.append(_ENTRY_INDENT) + rendered.append(line, style=TYPE_STYLE) + if value_text: + rendered.append("\n") + rendered.append(_ENTRY_INDENT) + rendered.append(value_text) + value_rendered = True + if value_text and not value_rendered: rendered.append(" ") rendered.append(value_text) rendered.append("\n") if description: - rendered.append(" ") + rendered.append(_ENTRY_INDENT) rendered.append(description, style="dim") rendered.append("\n") @@ -346,7 +364,7 @@ def _format_spec_type(type_text: str | None) -> str | None: cleaned = (type_text or "").strip() if not cleaned: return None - return _compact_type_text(cleaned) + return compact_type_text(cleaned) def _clean_description(description: str | None) -> str | None: diff --git a/src/adagio/model/ast.py b/src/adagio/model/ast.py index d41b099..e50a29b 100644 --- a/src/adagio/model/ast.py +++ b/src/adagio/model/ast.py @@ -1,5 +1,5 @@ import typing as t -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, field_validator class TypeASTUnion(BaseModel): @@ -39,9 +39,22 @@ class TypeASTPredicateChoices(TypeASTPredicateBase): class TypeASTPredicateRange(TypeASTPredicateBase): name: t.Literal['Range'] - range: tuple[int, int] | tuple[float, float] + range: tuple[int | None, int | None] | tuple[float | None, float | None] inclusive: tuple[bool, bool] + @field_validator('range', mode='before') + @classmethod + def validate_range_bounds(cls, value): + if not isinstance(value, (list, tuple)) or len(value) != 2: + return value + lower, upper = value + if lower is None and upper is None: + raise ValueError('Range must include at least one bound.') + bounds = [bound for bound in (lower, upper) if bound is not None] + if len(bounds) == 2 and type(bounds[0]) is not type(bounds[1]): + raise ValueError('Range bounds must use the same numeric type.') + return value + class TypeASTPredicateProperties(TypeASTPredicateBase): name: t.Literal['Properties'] @@ -52,4 +65,3 @@ class TypeASTPredicateProperties(TypeASTPredicateBase): t.Union[TypeASTUnion, TypeASTIntersection, TypeASTExpression], Field(discriminator='type') ] - diff --git a/src/adagio/type_format.py b/src/adagio/type_format.py new file mode 100644 index 0000000..e3dcf82 --- /dev/null +++ b/src/adagio/type_format.py @@ -0,0 +1,149 @@ +import re +import textwrap + +from rich.text import Text + + +TYPE_STYLE = "bold yellow" +SEMANTIC_TYPE_STYLE = "bold #84ad50" + + +def compact_type_text(type_text: str) -> str: + cleaned = type_text.strip() + if "Choices(" not in cleaned: + return f"({cleaned})" + + match = re.search(r"Choices\((.*)\)", cleaned) + if match is None: + return f"({cleaned})" + + choices = [ + choice.strip().strip("'\"") + for choice in match.group(1).split(",") + if choice.strip() + ] + if not choices: + return f"({cleaned})" + return "[" + "|".join(choices) + "]" + + +def path_type_label(spec_type: str | None) -> str: + cleaned = (spec_type or "").strip() + if not cleaned: + return "PATH" + return f"PATH\n{cleaned}" + + +def render_type_text(label: str, width: int) -> Text: + wrapped = wrap_type_label(label, width) + lines = wrapped.split("\n") + has_semantic_path_type = label.startswith("PATH\n") + rendered = Text() + for index, line in enumerate(lines): + if index: + rendered.append("\n") + style = ( + SEMANTIC_TYPE_STYLE + if has_semantic_path_type and index > 0 + else TYPE_STYLE + ) + rendered.append(line, style=style) + return rendered + + +def wrap_type_label(label: str, width: int) -> str: + return "\n".join( + line + for raw_line in label.splitlines() + for line in _wrap_type_label_line(raw_line, width) + ) + + +def type_label_display_width(label: str) -> int: + return max((len(line) for line in label.splitlines()), default=0) + + +def _wrap_type_label_line(label: str, width: int) -> list[str]: + if len(label) <= width: + return [label] + if label.startswith("[") and label.endswith("]"): + return _wrap_choice_label(label, width) + if " | " in label: + return _wrap_union_type_label(label, width) + return _wrap_long_type_label(label, width) + + +def _wrap_choice_label(label: str, width: int) -> list[str]: + choices = [choice for choice in label[1:-1].split("|") if choice] + if not choices: + return [label] + + lines: list[str] = [] + current = "[" + + for index, choice in enumerate(choices): + is_last = index == len(choices) - 1 + separator = "" if current in ("[", " |") else "|" + suffix = "]" if is_last else "" + candidate = current + separator + choice + suffix + + if len(candidate) <= width or current in ("[", " |"): + current = candidate + else: + lines.append(current) + current = " |" + choice + suffix + + if not current.endswith("]"): + current += "]" + lines.append(current) + return lines + + +def _wrap_union_type_label(label: str, width: int) -> list[str]: + members = [member for member in label.split(" | ") if member] + if not members: + return [label] + + lines: list[str] = [] + current = "" + for index, member in enumerate(members): + has_next = index < len(members) - 1 + candidate = f"{current} | {member}" if current else member + if current and len(candidate) <= width: + current = candidate + continue + if current: + _append_union_line(lines, current, width) + current = "" + if len(member) <= width: + current = member + else: + lines.extend(_wrap_long_type_label(member, width)) + if has_next: + _append_separator_to_last_line(lines, width) + if current: + lines.append(current) + return lines + + +def _append_union_line(lines: list[str], line: str, width: int) -> None: + if len(line) + 2 <= width: + lines.append(f"{line} |") + else: + lines.append(line) + + +def _append_separator_to_last_line(lines: list[str], width: int) -> None: + if lines and len(lines[-1]) + 2 <= width: + lines[-1] = f"{lines[-1]} |" + + +def _wrap_long_type_label(label: str, width: int) -> list[str]: + break_long_words = any(len(token) > width for token in label.split()) + return textwrap.wrap( + label, + width=width, + subsequent_indent=" ", + break_long_words=break_long_words, + break_on_hyphens=False, + ) or [label] diff --git a/tests/test_pipeline_descriptions.py b/tests/test_pipeline_descriptions.py index ceaec18..a14dba2 100644 --- a/tests/test_pipeline_descriptions.py +++ b/tests/test_pipeline_descriptions.py @@ -3,13 +3,16 @@ from adagio.app.parsers.pipeline import Input, Parameter, parse_inputs, parse_parameters from adagio.cli.dynamic import ( - _compact_type_text, _display_type_label, _pipeline_type_label, - _wrap_type_label, build_dynamic_run, ) from adagio.model.pipeline import AdagioPipeline +from adagio.type_format import ( + compact_type_text, + render_type_text, + wrap_type_label, +) class PipelineDescriptionTests(unittest.TestCase): @@ -136,18 +139,18 @@ def test_dynamic_run_help_includes_descriptions(self) -> None: self.assertNotIn("Pipeline parameter:", param_help) def test_choices_are_rendered_compactly(self) -> None: - compact = _compact_type_text( + compact = compact_type_text( "Str % Choices('ace', 'berger_parker_d', 'brillouin_d')" ) self.assertEqual(compact, "[ace|berger_parker_d|brillouin_d]") - compact_unquoted = _compact_type_text( + compact_unquoted = compact_type_text( "Str % Choices(ace, berger_parker_d, brillouin_d)" ) self.assertEqual(compact_unquoted, "[ace|berger_parker_d|brillouin_d]") def test_long_choice_labels_wrap_on_pipes(self) -> None: - wrapped = _wrap_type_label( + wrapped = wrap_type_label( "[ace|berger_parker_d|brillouin_d|chao1|dominance]", 22 ) self.assertIn("\n", wrapped) @@ -155,6 +158,46 @@ def test_long_choice_labels_wrap_on_pipes(self) -> None: self.assertTrue(wrapped.endswith("]")) self.assertIn("\n |", wrapped) + def test_long_semantic_union_labels_wrap_on_pipes(self) -> None: + wrapped = wrap_type_label( + "PATH\n" + "SampleData[Kraken2Report % Properties('reads', 'contigs', 'mags')]¹ | " + "FeatureData[Kraken2Report % Properties('mags')]²", + 44, + ) + + self.assertTrue(wrapped.startswith("PATH\n")) + self.assertIn("\nFeatureData[Kraken2Report", wrapped) + self.assertTrue(all(len(line) <= 44 for line in wrapped.splitlines())) + + def test_union_wrap_does_not_lead_with_pipe_after_long_first_member(self) -> None: + wrapped = wrap_type_label( + "VeryLongSemanticTypeNameWithoutBreaks | ShortType", + 12, + ) + + self.assertEqual(wrapped.splitlines()[-1], "ShortType") + self.assertNotIn("\n | ShortType", wrapped) + + def test_semantic_type_lines_render_green_after_path(self) -> None: + rendered = render_type_text("PATH\nFeatureTable[Frequency]", 44) + + self.assertEqual(rendered.plain, "PATH\nFeatureTable[Frequency]") + self.assertEqual(rendered.spans[0].style, "bold yellow") + self.assertEqual(rendered.spans[1].style, "bold #84ad50") + + def test_wrapped_semantic_type_lines_render_green_after_path(self) -> None: + rendered = render_type_text( + "PATH\nSampleData[Kraken2Report % Properties('reads', 'contigs')]", + 30, + ) + + self.assertTrue(rendered.plain.startswith("PATH\n")) + self.assertGreater(len(rendered.spans), 2) + self.assertTrue( + all(span.style == "bold #84ad50" for span in rendered.spans[1:]) + ) + def test_pipeline_type_labels_use_general_cli_types(self) -> None: self.assertEqual(_pipeline_type_label(int), "INTEGER") self.assertEqual(_pipeline_type_label(float), "NUMBER") @@ -166,7 +209,7 @@ def test_display_type_label_prefers_choices_and_path(self) -> None: _display_type_label( spec_type="FeatureTable[Frequency]", type_hint=str, is_input=True ), - "PATH", + "PATH\nFeatureTable[Frequency]", ) self.assertEqual( _display_type_label( diff --git a/tests/test_pipeline_show.py b/tests/test_pipeline_show.py index 4324f9c..f724d9e 100644 --- a/tests/test_pipeline_show.py +++ b/tests/test_pipeline_show.py @@ -20,6 +20,19 @@ "fields": [], } +OPEN_ENDED_RANGE_AST = { + "type": "expression", + "builtin": True, + "name": "Int", + "predicate": { + "type": "predicate", + "name": "Range", + "range": [1, None], + "inclusive": [True, False], + }, + "fields": [], +} + def _sample_pipeline_dict() -> dict: return { @@ -225,6 +238,63 @@ def test_pipeline_show_cli_prints_summary(self) -> None: self.assertIn('barcodes: (MetadataColumn[Categorical]) pipeline input "barcodes"', result.stdout) self.assertIn('table (FeatureTable[Frequency])', result.stdout) + def test_pipeline_show_accepts_open_ended_range_predicates(self) -> None: + pipeline_data = _sample_pipeline_dict() + pipeline_data["signature"]["parameters"].append( + { + "id": "param-sampling-depth", + "name": "sampling_depth", + "type": "Int % Range(1, None)", + "ast": OPEN_ENDED_RANGE_AST, + "required": True, + "description": "Sampling depth.", + } + ) + + pipeline = AdagioPipeline.model_validate(pipeline_data) + rendered = _render_plain(render_pipeline_text(pipeline)) + + self.assertIn("dada2.denoise_single", rendered) + + def test_pipeline_show_rejects_invalid_range_predicates(self) -> None: + pipeline_data = _sample_pipeline_dict() + invalid_range_ast = { + **OPEN_ENDED_RANGE_AST, + "predicate": { + **OPEN_ENDED_RANGE_AST["predicate"], + "range": [1, 2.5], + }, + } + pipeline_data["signature"]["parameters"].append( + { + "id": "param-invalid-range", + "name": "invalid_range", + "type": "Int % Range(1, 2.5)", + "ast": invalid_range_ast, + "required": True, + "description": "Invalid range.", + } + ) + + with self.assertRaises(ValueError): + AdagioPipeline.model_validate(pipeline_data) + + def test_pipeline_show_wraps_long_semantic_types(self) -> None: + pipeline_data = _sample_pipeline_dict() + pipeline_data["signature"]["inputs"][0]["type"] = ( + "SampleData[Kraken2Report % Properties('reads', 'contigs', 'mags')]¹ | " + "FeatureData[Kraken2Report % Properties('mags')]²" + ) + pipeline = AdagioPipeline.model_validate(pipeline_data) + + rendered = _render_plain(render_pipeline_text(pipeline)) + + self.assertIn( + "seqs: (SampleData[Kraken2Report % Properties", rendered + ) + self.assertIn("FeatureData[Kraken2Report", rendered) + self.assertIn('pipeline input "seqs"', rendered) + def test_render_pipeline_text_displays_collection_inputs(self) -> None: pipeline = AdagioPipeline.model_validate(_collection_pipeline_dict())