Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion src/specify_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2091,13 +2091,85 @@ def _parse_input_values(input_values: list[str] | None) -> dict[str, Any]:

def _workflow_run_payload(state: Any) -> dict[str, Any]:
"""Machine-readable summary of a run/resume outcome."""
return {
payload = {
"run_id": state.run_id,
"workflow_id": state.workflow_id,
"status": state.status.value,
"current_step_id": state.current_step_id,
"current_step_index": state.current_step_index,
}
gate = _gate_outcome(state)
if gate is not None:
payload["gate"] = gate
return payload


def _is_gate_step(step: dict[str, Any]) -> bool:
"""Whether a recorded step result is a gate.

Prefers the persisted ``type`` field, but when it is absent — a run paused
by an older version, whose step record predates ``type`` being stored —
falls back to the gate's unique output signature: only ``GateStep`` writes
an ``on_reject`` key. A record carrying a *different* known ``type`` is not
a gate, so the fallback applies only when ``type`` is missing entirely.
"""
step_type = step.get("type")
if step_type == "gate":
return True
if step_type:
return False
output = step.get("output")
return isinstance(output, dict) and "on_reject" in output


def _gate_outcome(state: Any) -> dict[str, Any] | None:
"""Gate detail for the structured outcome, when the run rests at a gate.

A paused or gate-aborted run is otherwise indistinguishable from any
other pause/abort in the machine-readable payload; surfacing the gate's
prompt, options, and (after an interactive choice) the decision lets
orchestrators drive review gates without parsing the human-facing stream.
"""
# Two run states rest *on* a gate: `paused` (awaiting a decision) and
# `aborted` (a gate rejected with `on_reject: abort` — the only path that
# sets ABORTED, leaving current_step_id on that gate). Any other status —
# notably `completed`/`failed` — must be suppressed: current_step_id is
# not cleared when a run whose last executed step was a gate moves on, so
# without this guard it would surface stale detail (run/resume/status).
if getattr(state.status, "value", state.status) not in ("paused", "aborted"):
return None
step = (getattr(state, "step_results", None) or {}).get(state.current_step_id)
if not isinstance(step, dict) or not _is_gate_step(step):
return None
output = step.get("output") or {}
# `message`, `options`, and `choice` may be non-string YAML literals in an
# unvalidated workflow (GateStep coerces none of them for the payload), so
# normalise all three for a stable JSON schema: message → str, options →
# list[str] | None, choice → str | None (None means no decision yet).
message = output.get("message")
choice = output.get("choice")
return {
"step_id": state.current_step_id,
"message": None if message is None else str(message),
"options": _normalize_gate_options(output.get("options")),
"choice": None if choice is None else str(choice),
}


def _normalize_gate_options(options: Any) -> list[str] | None:
"""Normalise a gate's ``options`` to a stable ``list[str]`` (or ``None``).

A valid gate stores a list, but an unvalidated workflow could leave a
scalar or tuple. ``None`` stays ``None`` (no options); a list/tuple maps
each element through ``str``; any other scalar becomes a single-element
list — so the emitted JSON schema is always ``list[str] | None``. A bare
string is treated as one option, never iterated character-by-character.
"""
if options is None:
return None
if isinstance(options, (list, tuple)):
return [str(o) for o in options]
return [str(options)]


def _run_outcome_exit_code(status_value: str) -> int:
Expand Down
7 changes: 4 additions & 3 deletions src/specify_cli/workflows/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ class StepContext:
#: Resolved workflow inputs (from user prompts / defaults).
inputs: dict[str, Any] = field(default_factory=dict)

#: Accumulated step results keyed by step ID.
#: Each entry is ``{"integration": ..., "model": ..., "options": ...,
#: "input": ..., "output": ...}``.
#: Accumulated step results keyed by step ID. Each entry is the dict the
#: engine persists per step:
#: ``{"type": ..., "integration": ..., "model": ..., "options": ...,
#: "input": ..., "output": ..., "status": ...}``.
steps: dict[str, dict[str, Any]] = field(default_factory=dict)

#: Current fan-out item (set only inside fan-out iterations).
Expand Down
1 change: 1 addition & 0 deletions src/specify_cli/workflows/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ def _execute_steps(

# Record step results — prefer resolved values from step output
step_data = {
"type": step_type,
"integration": result.output.get("integration")
or step_config.get("integration")
Comment thread
mnriem marked this conversation as resolved.
or context.default_integration,
Expand Down
231 changes: 231 additions & 0 deletions tests/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -5288,3 +5288,234 @@ def test_resume_failed_run_exits_nonzero(self, tmp_path, monkeypatch):
assert resumed.exit_code == 1, resumed.stdout
payload = _json.loads(resumed.stdout)
assert payload["status"] == "failed"


class TestWorkflowRunGateOutcomeJson:
"""CLI-level tests: the --json payload surfaces gate pauses."""

_WF_GATE = """
schema_version: "1.0"
workflow:
id: "gate-json"
name: "Gate JSON"
version: "1.0.0"
steps:
- id: review
type: gate
message: "Approve the thing?"
options: ["approve", "reject"]
"""

_WF_PLAIN = """
schema_version: "1.0"
workflow:
id: "plain-json"
name: "Plain JSON"
version: "1.0.0"
steps:
- id: fine
type: shell
run: "exit 0"
"""

def _run_json(self, tmp_path, monkeypatch, content, *, expected_exit=0):
import json as _json
from typer.testing import CliRunner
from specify_cli import app

path = tmp_path / "wf.yml"
path.write_text(content, encoding="utf-8")
monkeypatch.chdir(tmp_path)
result = CliRunner().invoke(app, ["workflow", "run", str(path), "--json"])
# Assert the expected exit code before parsing so a real failure
# surfaces the actual output instead of an opaque JSON decode error.
# A terminal run still emits its JSON payload, then exits non-zero on
# ``failed``/``aborted`` (see ``_run_outcome_exit_code``), so callers
# pass the expected code. Use ``result.output`` for the message:
# under ``--json`` step output is redirected off stdout, so the useful
# diagnostics live there.
assert result.exit_code == expected_exit, result.output
return _json.loads(result.stdout)

def test_gate_pause_carries_gate_block(self, tmp_path, monkeypatch):
# CliRunner stdin is not a TTY, so the gate pauses for resume.
payload = self._run_json(tmp_path, monkeypatch, self._WF_GATE)
assert payload["status"] == "paused"
assert payload["gate"] == {
"step_id": "review",
"message": "Approve the thing?",
"options": ["approve", "reject"],
"choice": None,
}

def test_completed_run_has_no_gate_block(self, tmp_path, monkeypatch):
payload = self._run_json(tmp_path, monkeypatch, self._WF_PLAIN)
assert payload["status"] == "completed"
assert "gate" not in payload

def test_gate_abort_carries_gate_block(self, tmp_path, monkeypatch):
# An interactive gate the operator rejects ends the run as `aborted`
# (on_reject defaults to abort), not `paused`. The JSON surface must
# still carry the gate block with the recorded choice so an
# orchestrator can see *why* the run stopped. A gate abort emits the
# payload and then exits non-zero (aborted → exit 1), so the helper
# is told to expect exit code 1.
from specify_cli.workflows.steps.gate import GateStep

_force_gate_stdin(monkeypatch, tty=True)
monkeypatch.setattr(
GateStep, "_prompt", staticmethod(lambda _msg, _opts: "reject")
)
payload = self._run_json(
tmp_path, monkeypatch, self._WF_GATE, expected_exit=1
)
assert payload["status"] == "aborted"
assert payload["gate"] == {
"step_id": "review",
"message": "Approve the thing?",
"options": ["approve", "reject"],
"choice": "reject",
}

def test_gate_block_emitted_only_when_run_rests_at_gate(self):
# A run rests *on* a gate only while `paused` (awaiting a decision) or
# `aborted` (gate rejected with on_reject: abort). current_step_id is
# not cleared afterwards, so a `completed`/`failed` run whose last
# executed step was a gate must NOT surface a stale gate block.
from types import SimpleNamespace
from specify_cli import _gate_outcome

gate_step = {
"type": "gate",
"output": {
"message": "m",
"options": ["approve", "reject"],
"choice": "reject",
},
}

def _state(status):
return SimpleNamespace(
status=SimpleNamespace(value=status),
current_step_id="review",
step_results={"review": gate_step},
)

assert _gate_outcome(_state("completed")) is None
assert _gate_outcome(_state("failed")) is None
assert _gate_outcome(_state("paused")) is not None
assert _gate_outcome(_state("aborted")) is not None

def test_gate_block_message_coerced_to_string(self):
# message may be a non-string YAML literal (e.g. a number); the JSON
# surface normalises it so the emitted schema stays stable.
from types import SimpleNamespace
from specify_cli import _gate_outcome

state = SimpleNamespace(
status=SimpleNamespace(value="paused"),
current_step_id="review",
step_results={
"review": {
"type": "gate",
"output": {"message": 12.5, "options": ["ok"], "choice": None},
}
},
)
assert _gate_outcome(state)["message"] == "12.5"

def test_gate_block_options_coerced_to_strings(self):
# options may be non-string / non-list literals in an unvalidated
# workflow; the JSON surface always normalises them to list[str] | None
# so the emitted schema is stable regardless of the input shape.
from types import SimpleNamespace
from specify_cli import _gate_outcome

def _options_payload(options):
state = SimpleNamespace(
status=SimpleNamespace(value="paused"),
current_step_id="review",
step_results={
"review": {
"type": "gate",
"output": {
"message": "m",
"options": options,
"choice": None,
},
}
},
)
return _gate_outcome(state)["options"]

assert _options_payload([1, 2.5]) == ["1", "2.5"] # list
assert _options_payload(("approve", "reject")) == ["approve", "reject"] # tuple
assert _options_payload("approve") == ["approve"] # bare scalar, not iterated
assert _options_payload(7) == ["7"] # numeric scalar
assert _options_payload(None) is None # absent stays absent

def test_gate_block_choice_coerced_to_string(self):
# An unvalidated gate can record a non-string choice; the JSON
# surface normalises it to str (and keeps None = no decision yet),
# consistent with the message/options normalization.
from types import SimpleNamespace
from specify_cli import _gate_outcome

def _choice_payload(choice):
state = SimpleNamespace(
status=SimpleNamespace(value="paused"),
current_step_id="review",
step_results={
"review": {
"type": "gate",
"output": {"message": "m", "options": ["ok"], "choice": choice},
}
},
)
return _gate_outcome(state)["choice"]

assert _choice_payload(None) is None # no decision yet
assert _choice_payload("reject") == "reject" # normal string passes through
assert _choice_payload(2) == "2" # non-string coerced

def test_gate_block_detected_without_type_field(self):
# A run paused by an older version has no persisted step `type`. The
# gate is still detected by its unique output signature (`on_reject`),
# so resume surfaces the gate block instead of silently dropping it.
from types import SimpleNamespace
from specify_cli import _gate_outcome

state = SimpleNamespace(
status=SimpleNamespace(value="paused"),
current_step_id="review",
step_results={
"review": {
# no "type" key — pre-dates the field being persisted
"output": {
"message": "Approve?",
"options": ["approve", "reject"],
"on_reject": "abort",
"choice": None,
},
}
},
)
gate = _gate_outcome(state)
assert gate is not None
assert gate["step_id"] == "review"
assert gate["options"] == ["approve", "reject"]

def test_non_gate_step_without_type_is_not_a_gate(self):
# A typeless record lacking the gate signature must NOT be mistaken for
# a gate (the fallback keys off `on_reject`, which only GateStep writes).
from types import SimpleNamespace
from specify_cli import _gate_outcome

state = SimpleNamespace(
status=SimpleNamespace(value="paused"),
current_step_id="run-tests",
step_results={
"run-tests": {"output": {"exit_code": 0, "stdout": "ok"}},
},
)
assert _gate_outcome(state) is None