Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@ using [PEP 440](https://packaging.python.org/en/latest/specifications/version-sp

## [Unreleased]

## 0.1.0a4 - 2026-05-01

### Added

- Adds generated qapi metadata transformer actions so compatible artifacts can
be converted to metadata inside exported Adagio pipelines.
- Adds pipeline/runtime support for built-in metadata conversion steps and
archive collection bindings.

### Fixed

- Fixes optional pipeline inputs so omitted optional values are not treated as
required at runtime.
- Fixes dynamic run options so `--show-params` only controls help display and
does not affect which CLI options can be passed.

## 0.1.0a3 - 2026-05-01

- Adds support for collections. Adagio pipelines with collections are now handled
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "adagio-cli"
version = "0.1.0a3"
version = "0.1.0a4"
description = "Command-line runner for Adagio pipeline files."
readme = "README.md"
requires-python = ">=3.10"
Expand Down
18 changes: 18 additions & 0 deletions src/adagio/cli/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ def build_dynamic_run(
input_specs: list[InputSpec],
param_specs: list[ParamSpec],
output_specs: list[OutputSpec],
visible_input_names: set[str] | None = None,
visible_param_names: set[str] | None = None,
visible_output_names: set[str] | None = None,
argument_inputs: dict[str, Any] | None = None,
argument_params: dict[str, Any] | None = None,
run_handler: Callable[
Expand All @@ -261,6 +264,13 @@ def build_dynamic_run(
],
):
"""Build a dynamic run command from pipeline input, parameter, and output specs."""
visible_input_names = (
set(visible_input_names) if visible_input_names is not None else None
)
visible_param_names = set(visible_param_names) if visible_param_names is not None else None
visible_output_names = (
set(visible_output_names) if visible_output_names is not None else None
)
input_bindings: list[tuple[str, str]] = []
param_bindings: list[tuple[str, str]] = []
output_bindings: list[tuple[str, str]] = []
Expand Down Expand Up @@ -401,6 +411,7 @@ def add_dynamic_option(
help_text: str,
default: Any,
group: Group | tuple[Group, ...],
show: bool = True,
) -> None:
if opt in seen_opts:
raise ValueError(f"Conflicting CLI option generated: {opt!r}.")
Expand All @@ -414,6 +425,7 @@ def add_dynamic_option(
group=group,
help=help_text,
required=required,
show=show,
),
]
parameters.append(
Expand Down Expand Up @@ -446,6 +458,7 @@ def add_input_spec(spec: InputSpec) -> None:

type_text = spec.type
opt = dynamic_opt(original, ParamType.INPUT)
show = visible_input_names is None or original in visible_input_names
entry_metadata[opt] = {
"type_label": _display_type_label(
spec_type=type_text, type_hint=str, is_input=True
Expand All @@ -463,6 +476,7 @@ def add_input_spec(spec: InputSpec) -> None:
),
default=None,
group=pipeline_group,
show=show,
)

def add_param_spec(spec: ParamSpec) -> None:
Expand All @@ -486,6 +500,7 @@ def add_param_spec(spec: ParamSpec) -> None:
param_default = None
param_type: Any = _resolve_param_type(spec.type, default)
opt = dynamic_opt(original, ParamType.PARAM)
show = visible_param_names is None or original in visible_param_names
if is_required:
required_params.append(original)
entry_metadata[opt] = {
Expand All @@ -505,6 +520,7 @@ def add_param_spec(spec: ParamSpec) -> None:
),
default=param_default,
group=pipeline_group,
show=show,
)

for spec in required_input_specs:
Expand All @@ -526,6 +542,7 @@ def add_param_spec(spec: ParamSpec) -> None:
seen_idents.add(ident)
output_bindings.append((ident, original))
opt = dynamic_opt(original, ParamType.OUTPUT)
show = visible_output_names is None or original in visible_output_names
entry_metadata[opt] = {
"type_label": path_type_label(spec.type),
"default": None,
Expand All @@ -541,6 +558,7 @@ def add_param_spec(spec: ParamSpec) -> None:
),
default=None,
group=pipeline_group,
show=show,
)

def run(
Expand Down
9 changes: 6 additions & 3 deletions src/adagio/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,12 @@ def run(
)

dynamic_run = build_dynamic_run(
input_specs=visible_inputs,
param_specs=visible_params,
output_specs=visible_outputs,
input_specs=input_specs,
param_specs=param_specs,
output_specs=output_specs,
visible_input_names={spec.name for spec in visible_inputs},
visible_param_names={spec.name for spec in visible_params},
visible_output_names={spec.name for spec in visible_outputs},
argument_inputs=arguments_data.get("inputs", {}) if arguments_data else None,
argument_params=arguments_data.get("parameters", {}) if arguments_data else None,
run_handler=partial(run_pipeline_from_kwargs, console=console),
Expand Down
11 changes: 9 additions & 2 deletions src/adagio/executors/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
from adagio.model.task import input_source_ids


def plan_execution_order(*, tasks: list[t.Any], scope: dict[str, t.Any]) -> list[t.Any]:
def plan_execution_order(
*,
tasks: list[t.Any],
scope: dict[str, t.Any],
optional_missing_ids: set[str] | None = None,
) -> list[t.Any]:
"""Return a dependency-respecting serial execution plan."""
available_ids = set(scope.keys())
optional_missing_ids = optional_missing_ids or set()
remaining = list(tasks)
planned: list[t.Any] = []

Expand All @@ -16,7 +22,7 @@ def plan_execution_order(*, tasks: list[t.Any], scope: dict[str, t.Any]) -> list
source_id
for src in task.inputs.values()
for source_id in input_source_ids(src)
if source_id not in available_ids
if source_id not in available_ids and source_id not in optional_missing_ids
]
if missing:
continue
Expand All @@ -35,6 +41,7 @@ def plan_execution_order(*, tasks: list[t.Any], scope: dict[str, t.Any]) -> list
for src in task.inputs.values()
for source_id in input_source_ids(src)
if source_id not in available_ids
and source_id not in optional_missing_ids
)
details.append(f"{task.id}: missing [{missing}]")
raise RuntimeError("Unable to resolve task dependencies. " + "; ".join(details))
Expand Down
17 changes: 15 additions & 2 deletions src/adagio/executors/serial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class SerialExecutionState:
params: dict[str, t.Any]
scope: dict[str, InputSource]
cache_config: ExecutionCacheConfig | None
missing_optional_ids: set[str] = field(default_factory=set)
saved_output_ids: set[str] = field(default_factory=set)
save_output_started: bool = False

Expand Down Expand Up @@ -64,13 +65,21 @@ def run_serial_pipeline(

active_monitor.start_load_input()
for input_def in sig.inputs:
source = arguments.inputs[input_def.name]
source = arguments.inputs.get(input_def.name)
if _is_missing(source):
if not input_def.required:
state.missing_optional_ids.add(input_def.id)
continue
state.scope[input_def.id] = resolve_pipeline_input(
source=source, type_name=input_def.type, cwd=state.cwd
)
active_monitor.finish_load_input()

execution_plan = plan_execution_order(tasks=tasks, scope=state.scope)
execution_plan = plan_execution_order(
tasks=tasks,
scope=state.scope,
optional_missing_ids=state.missing_optional_ids,
)
for task in execution_plan:
active_monitor.queue_task(
task_id=task.id,
Expand Down Expand Up @@ -138,6 +147,10 @@ def resolve_monitor(*, console: Console | None, monitor: Monitor | None) -> Moni
return LogMonitor()


def _is_missing(value: t.Any) -> bool:
return value is None or value == "" or value == "<fill me>" or value == [] or value == {}


def resolve_pipeline_input(
*, source: InputSource, type_name: str, cwd: Path
) -> InputSource:
Expand Down
38 changes: 35 additions & 3 deletions src/adagio/executors/task_environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from rich.console import Console

from adagio.model.arguments import AdagioArguments
from adagio.model.task import PluginActionTask, RootInputTask
from adagio.model.task import ConvertToMetadataTask, PluginActionTask, RootInputTask
from adagio.monitor.api import Monitor

from .base import (
Expand Down Expand Up @@ -60,9 +60,21 @@ def _resolve_task(
if isinstance(task, RootInputTask):
for name, src in task.inputs.items():
dst = task.outputs[name]
if src.id in state.missing_optional_ids:
state.missing_optional_ids.add(dst.id)
continue
state.scope[dst.id] = state.scope[src.id]
return False

if isinstance(task, ConvertToMetadataTask):
if task.inputs["data"].id in state.missing_optional_ids:
state.missing_optional_ids.add(task.outputs["metadata"].id)
return False
state.scope[task.outputs["metadata"].id] = state.scope[
task.inputs["data"].id
]
return False

if isinstance(task, PluginActionTask):
return self._execute_plugin_action(
task=task,
Expand Down Expand Up @@ -91,6 +103,8 @@ def _execute_plugin_action(
metadata_inputs: dict[str, str] = {}
for name, src in task.inputs.items():
if src.kind == "archive":
if src.id in state.missing_optional_ids:
continue
value = state.scope[src.id]
if isinstance(value, list):
archive_collection_inputs[name] = value
Expand All @@ -99,10 +113,15 @@ def _execute_plugin_action(
else:
archive_inputs[name] = value
elif src.kind == "archive-collection":
archive_collection_inputs[name] = _flatten_collection_items(
[state.scope[item.id] for item in src.items]
values = _present_collection_item_values(
items=src.items,
state=state,
)
if values:
archive_collection_inputs[name] = _flatten_collection_items(values)
elif src.kind == "metadata":
if src.id in state.missing_optional_ids:
continue
value = state.scope[src.id]
if not isinstance(value, str):
raise TypeError(
Expand Down Expand Up @@ -190,6 +209,19 @@ def _flatten_collection_items(
return result


def _present_collection_item_values(
*,
items,
state: SerialExecutionState,
) -> list[str | list[str] | dict[str, str]]:
values: list[str | list[str] | dict[str, str]] = []
for item in items:
if item.id in state.missing_optional_ids:
continue
values.append(state.scope[item.id])
return values


def _save_outputs(
*,
sig,
Expand Down
11 changes: 8 additions & 3 deletions src/adagio/model/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import typing as t
import os
import json

from pydantic import BaseModel, RootModel, model_validator, Field
from pydantic import BaseModel, RootModel, model_validator


from .arguments import AdagioArguments
Expand Down Expand Up @@ -68,7 +67,9 @@ def load_inputs(self, ctx, arguments, scope):
from adagio.io import load_input, load_input_collection, load_metadata

for input in self.inputs:
source = arguments.inputs[input.name]
source = arguments.inputs.get(input.name)
if _is_missing(source):
continue
if _is_metadata_ast(input.ast):
print("SCHEDULED:", f'load_metadata({source!r})')
scope[input.id] = load_metadata(ctx=ctx, source=source)
Expand Down Expand Up @@ -140,3 +141,7 @@ def _is_metadata_ast(ast: TypeAST) -> bool:

def _is_collection_type(type_name: str) -> bool:
return type_name.startswith('List[') or type_name.startswith('Collection[')


def _is_missing(value: t.Any) -> bool:
return value is None or value == "" or value == "<fill me>" or value == [] or value == {}
Loading
Loading