Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 14 additions & 56 deletions src/adagio/cli/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
CACHE_DIR_HELP,
REUSE_HELP,
)
from ..type_format import (
compact_type_text,
path_type_label,
render_type_text,
type_label_display_width,
)
from .args import ParamType, ShowParamsMode, dynamic_opt, to_identifier


Expand Down Expand Up @@ -75,10 +81,10 @@ def _pipeline_type_label(type_hint: Any) -> str:

def _display_type_label(*, spec_type: str | None, type_hint: Any, is_input: bool) -> str:
if is_input:
return "PATH"
return path_type_label(spec_type)

if spec_type:
compact = _compact_type_text(spec_type)
compact = compact_type_text(spec_type)
if compact.startswith("["):
return compact

Expand All @@ -95,58 +101,8 @@ def _output_path_help(description: str | None) -> str:
def _render_pipeline_type(
entry: Any, entry_metadata: dict[str, dict[str, Any]], width: int
) -> Any:
from rich.text import Text

label = entry_metadata.get(_entry_key(entry), {}).get("type_label", "TEXT")
return Text(_wrap_type_label(label, width), style="bold yellow")


def _compact_type_text(type_text: str) -> str:
cleaned = type_text.strip()
if "Choices(" not in cleaned:
return f"({cleaned})"

match = re.search(r"Choices\((.*)\)", cleaned)
if match is None:
return f"({cleaned})"

choices = [
choice.strip().strip("'\"")
for choice in match.group(1).split(",")
if choice.strip()
]
if not choices:
return f"({cleaned})"
return "[" + "|".join(choices) + "]"


def _wrap_type_label(label: str, width: int) -> str:
if len(label) <= width or not (label.startswith("[") and label.endswith("]")):
return label

choices = [choice for choice in label[1:-1].split("|") if choice]
if not choices:
return label

lines: list[str] = []
current = "["

for index, choice in enumerate(choices):
is_last = index == len(choices) - 1
separator = "" if current in ("[", " |") else "|"
suffix = "]" if is_last else ""
candidate = current + separator + choice + suffix

if len(candidate) <= width or current in ("[", " |"):
current = candidate
else:
lines.append(current)
current = " |" + choice + suffix

if not current.endswith("]"):
current += "]"
lines.append(current)
return "\n".join(lines)
return render_type_text(label, width)


def _render_pipeline_description(
Expand Down Expand Up @@ -191,10 +147,12 @@ def _get_pipeline_parameter_columns(
8,
min(
max(
len(entry_metadata.get(_entry_key(entry), {}).get("type_label", "TEXT"))
type_label_display_width(
entry_metadata.get(_entry_key(entry), {}).get("type_label", "TEXT")
)
for entry in entries
),
max(22, min(34, math.ceil(console.width * 0.3))),
max(28, min(70, math.ceil(console.width * 0.35))),
),
)
name_column = ColumnSpec(
Expand Down Expand Up @@ -565,7 +523,7 @@ def add_param_spec(spec: ParamSpec) -> None:
output_bindings.append((ident, original))
opt = dynamic_opt(original, ParamType.OUTPUT)
entry_metadata[opt] = {
"type_label": "PATH",
"type_label": path_type_label(spec.type),
"default": None,
"required": False,
}
Expand Down
28 changes: 23 additions & 5 deletions src/adagio/describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from rich.panel import Panel
from rich.text import Text

from .cli.dynamic import _compact_type_text
from .executors.common import plan_execution_order
from .model.pipeline import AdagioPipeline
from .model.task import (
Expand All @@ -16,6 +15,7 @@
PromotedVal,
RootInputTask,
)
from .type_format import TYPE_STYLE, compact_type_text, wrap_type_label


@dataclass(frozen=True)
Expand All @@ -25,6 +25,11 @@ class _DisplayRef:
description: str | None = None


_ENTRY_INDENT = " "
# Fixed so pipeline-show output remains stable and easy to copy between terminals.
_PIPELINE_SHOW_TYPE_WIDTH = 72


def render_pipeline_text(pipeline: AdagioPipeline) -> Text | Group:
available_ids = {
input_def.id: _DisplayRef(
Expand Down Expand Up @@ -230,15 +235,28 @@ def _append_entry_line(
rendered.append(name, style="cyan")
if value_text is not None:
rendered.append(":", style="cyan")
value_rendered = False
if type_label:
rendered.append(" ")
rendered.append(type_label, style="bold yellow")
if value_text:
wrapped_type = wrap_type_label(type_label, _PIPELINE_SHOW_TYPE_WIDTH)
type_lines = wrapped_type.splitlines()
rendered.append(type_lines[0], style=TYPE_STYLE)
if len(type_lines) > 1:
for line in type_lines[1:]:
rendered.append("\n")
rendered.append(_ENTRY_INDENT)
rendered.append(line, style=TYPE_STYLE)
if value_text:
rendered.append("\n")
rendered.append(_ENTRY_INDENT)
rendered.append(value_text)
value_rendered = True
if value_text and not value_rendered:
rendered.append(" ")
rendered.append(value_text)
rendered.append("\n")
if description:
rendered.append(" ")
rendered.append(_ENTRY_INDENT)
rendered.append(description, style="dim")
rendered.append("\n")

Expand Down Expand Up @@ -346,7 +364,7 @@ def _format_spec_type(type_text: str | None) -> str | None:
cleaned = (type_text or "").strip()
if not cleaned:
return None
return _compact_type_text(cleaned)
return compact_type_text(cleaned)


def _clean_description(description: str | None) -> str | None:
Expand Down
18 changes: 15 additions & 3 deletions src/adagio/model/ast.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import typing as t
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, field_validator


class TypeASTUnion(BaseModel):
Expand Down Expand Up @@ -39,9 +39,22 @@ class TypeASTPredicateChoices(TypeASTPredicateBase):

class TypeASTPredicateRange(TypeASTPredicateBase):
name: t.Literal['Range']
range: tuple[int, int] | tuple[float, float]
range: tuple[int | None, int | None] | tuple[float | None, float | None]
inclusive: tuple[bool, bool]

@field_validator('range', mode='before')
@classmethod
def validate_range_bounds(cls, value):
if not isinstance(value, (list, tuple)) or len(value) != 2:
return value
lower, upper = value
if lower is None and upper is None:
raise ValueError('Range must include at least one bound.')
bounds = [bound for bound in (lower, upper) if bound is not None]
if len(bounds) == 2 and type(bounds[0]) is not type(bounds[1]):
raise ValueError('Range bounds must use the same numeric type.')
return value


class TypeASTPredicateProperties(TypeASTPredicateBase):
name: t.Literal['Properties']
Expand All @@ -52,4 +65,3 @@ class TypeASTPredicateProperties(TypeASTPredicateBase):
t.Union[TypeASTUnion, TypeASTIntersection, TypeASTExpression],
Field(discriminator='type')
]

149 changes: 149 additions & 0 deletions src/adagio/type_format.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import re
import textwrap

from rich.text import Text


TYPE_STYLE = "bold yellow"
SEMANTIC_TYPE_STYLE = "bold #84ad50"


def compact_type_text(type_text: str) -> str:
cleaned = type_text.strip()
if "Choices(" not in cleaned:
return f"({cleaned})"

match = re.search(r"Choices\((.*)\)", cleaned)
if match is None:
return f"({cleaned})"

choices = [
choice.strip().strip("'\"")
for choice in match.group(1).split(",")
if choice.strip()
]
if not choices:
return f"({cleaned})"
return "[" + "|".join(choices) + "]"


def path_type_label(spec_type: str | None) -> str:
cleaned = (spec_type or "").strip()
if not cleaned:
return "PATH"
return f"PATH\n{cleaned}"


def render_type_text(label: str, width: int) -> Text:
wrapped = wrap_type_label(label, width)
lines = wrapped.split("\n")
has_semantic_path_type = label.startswith("PATH\n")
rendered = Text()
for index, line in enumerate(lines):
if index:
rendered.append("\n")
style = (
SEMANTIC_TYPE_STYLE
if has_semantic_path_type and index > 0
else TYPE_STYLE
)
rendered.append(line, style=style)
return rendered


def wrap_type_label(label: str, width: int) -> str:
return "\n".join(
line
for raw_line in label.splitlines()
for line in _wrap_type_label_line(raw_line, width)
)


def type_label_display_width(label: str) -> int:
return max((len(line) for line in label.splitlines()), default=0)


def _wrap_type_label_line(label: str, width: int) -> list[str]:
if len(label) <= width:
return [label]
if label.startswith("[") and label.endswith("]"):
return _wrap_choice_label(label, width)
if " | " in label:
return _wrap_union_type_label(label, width)
return _wrap_long_type_label(label, width)


def _wrap_choice_label(label: str, width: int) -> list[str]:
choices = [choice for choice in label[1:-1].split("|") if choice]
if not choices:
return [label]

lines: list[str] = []
current = "["

for index, choice in enumerate(choices):
is_last = index == len(choices) - 1
separator = "" if current in ("[", " |") else "|"
suffix = "]" if is_last else ""
candidate = current + separator + choice + suffix

if len(candidate) <= width or current in ("[", " |"):
current = candidate
else:
lines.append(current)
current = " |" + choice + suffix

if not current.endswith("]"):
current += "]"
lines.append(current)
return lines


def _wrap_union_type_label(label: str, width: int) -> list[str]:
members = [member for member in label.split(" | ") if member]
if not members:
return [label]

lines: list[str] = []
current = ""
for index, member in enumerate(members):
has_next = index < len(members) - 1
candidate = f"{current} | {member}" if current else member
if current and len(candidate) <= width:
current = candidate
continue
if current:
_append_union_line(lines, current, width)
current = ""
if len(member) <= width:
current = member
else:
lines.extend(_wrap_long_type_label(member, width))
if has_next:
_append_separator_to_last_line(lines, width)
if current:
lines.append(current)
return lines


def _append_union_line(lines: list[str], line: str, width: int) -> None:
if len(line) + 2 <= width:
lines.append(f"{line} |")
else:
lines.append(line)


def _append_separator_to_last_line(lines: list[str], width: int) -> None:
if lines and len(lines[-1]) + 2 <= width:
lines[-1] = f"{lines[-1]} |"


def _wrap_long_type_label(label: str, width: int) -> list[str]:
break_long_words = any(len(token) > width for token in label.split())
return textwrap.wrap(
label,
width=width,
subsequent_indent=" ",
break_long_words=break_long_words,
break_on_hyphens=False,
) or [label]
Loading
Loading