From cde36899e8682ae46a1e8c3f1ddfb802dc620a8b Mon Sep 17 00:00:00 2001 From: GitHub Copilot Date: Sun, 15 Mar 2026 15:20:07 -0300 Subject: [PATCH] feat(F50): harden state machine with StrEnum and explicit terminal states Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- features/F50-state-machine-hardening/SPEC.md | 26 +++++ src/synapse_os/persistence.py | 13 +-- src/synapse_os/pipeline.py | 108 ++++++++++--------- src/synapse_os/runtime/worker.py | 3 +- src/synapse_os/state_machine.py | 92 +++++++++++----- 5 files changed, 155 insertions(+), 87 deletions(-) create mode 100644 features/F50-state-machine-hardening/SPEC.md diff --git a/features/F50-state-machine-hardening/SPEC.md b/features/F50-state-machine-hardening/SPEC.md new file mode 100644 index 0000000..7007f18 --- /dev/null +++ b/features/F50-state-machine-hardening/SPEC.md @@ -0,0 +1,26 @@ +--- +id: F50-state-machine-hardening +type: feature +summary: "Formalizar estados do pipeline com Enums e definir estados terminais explícitos" +inputs: + - N/A (Refatoração interna) +outputs: + - Código usando PipelineState(StrEnum) + - Comportamento consistente em estados terminais +acceptance_criteria: + - Todos os estados do pipeline devem ser definidos em um StrEnum + - Literais de string soltos devem ser removidos do código core + - Estados terminais (COMPLETED, FAILED, CANCELLED) devem ser definidos em um set imutável + - O worker deve parar imediatamente ao encontrar um estado terminal +non_goals: + - Alterar a lógica de transição existente (apenas a representação dos estados) + - Adicionar novos estados +--- + +## Contexto +Atualmente, os estados do pipeline (ex: "PLAN", "TEST_RED") são strings literais espalhadas pelo código (`pipeline.py`, `state_machine.py`, `worker.py`). Isso é propenso a erros de digitação e dificulta a análise estática. Além disso, a definição de "estado terminal" está implícita em algumas verificações, mas não centralizada. + +## Objetivo +1. Criar `class PipelineState(StrEnum)` em `src/aignt_os/state_machine.py`. +2. Definir `TERMINAL_STATES` como um conjunto de `PipelineState`. +3. Refatorar `PipelineEngine`, `RuntimeWorker` e testes para usar o Enum. diff --git a/src/synapse_os/persistence.py b/src/synapse_os/persistence.py index 48979e2..909f6d1 100644 --- a/src/synapse_os/persistence.py +++ b/src/synapse_os/persistence.py @@ -35,6 +35,7 @@ StepExecutor, ) from synapse_os.security import compute_file_sha256, resolve_path_within_root, sanitize_clean_text +from synapse_os.state_machine import PipelineState from synapse_os.supervisor import Supervisor, SupervisorDecision ARTIFACT_DIR_MODE = 0o700 @@ -670,7 +671,7 @@ def _artifacts_for_step( if result is not None: return dict(result.artifacts) - if step.state == "SPEC_VALIDATION": + if step.state == PipelineState.SPEC_VALIDATION: return { key: value for key, value in context.artifacts.items() @@ -698,7 +699,7 @@ def run( self, spec_path: Path, *, - stop_at: str = "TEST_RED", + stop_at: str = PipelineState.TEST_RED, initiated_by: str = "system", spec_hash: str | None = None, ) -> PipelineContext: @@ -714,7 +715,7 @@ def create_pending_run( self, spec_path: Path, *, - stop_at: str = "TEST_RED", + stop_at: str = PipelineState.TEST_RED, initiated_by: str = "system", spec_hash: str | None = None, ) -> str: @@ -747,7 +748,7 @@ def check_cancellation(self, _: PipelineContext) -> bool: executors = dict(self.executors) executors.setdefault( - "DOCUMENT", + PipelineState.DOCUMENT, _RunReportStepExecutor( repository=self.repository, artifact_store=self.artifact_store, @@ -779,14 +780,14 @@ def _create_pending_run_with_provenance( ) run_id = self.repository.create_run( spec_path=resolved_spec_path, - initial_state="REQUEST", + initial_state=PipelineState.REQUEST, stop_at=stop_at, spec_hash=persisted_spec_hash, initiated_by=initiated_by, ) self.repository.record_event( run_id, - state="REQUEST", + state=PipelineState.REQUEST, event_type="security_provenance_recorded", message=( f"Provenance recorded for initiated_by={initiated_by} " diff --git a/src/synapse_os/pipeline.py b/src/synapse_os/pipeline.py index fe2575d..a0bbc7b 100644 --- a/src/synapse_os/pipeline.py +++ b/src/synapse_os/pipeline.py @@ -13,7 +13,7 @@ from synapse_os.specs import ( SpecValidationError as _SpecValidationError, ) -from synapse_os.state_machine import LINEAR_STATE_FLOW, SynapseStateMachine +from synapse_os.state_machine import LINEAR_STATE_FLOW, PipelineState, SynapseStateMachine from synapse_os.supervisor import ( RetryableStepError, Supervisor, @@ -22,28 +22,28 @@ PRIMARY_EXECUTOR_ROUTE = "primary" PIPELINE_STOP_STATES = ( - "SPEC_VALIDATION", - "PLAN", - "TEST_RED", - "CODE_GREEN", - "QUALITY_GATE", - "REVIEW", - "SECURITY", - "DOCUMENT", + PipelineState.SPEC_VALIDATION, + PipelineState.PLAN, + PipelineState.TEST_RED, + PipelineState.CODE_GREEN, + PipelineState.QUALITY_GATE, + PipelineState.REVIEW, + PipelineState.SECURITY, + PipelineState.DOCUMENT, ) PIPELINE_ENTRY_STATES = ( - "REQUEST", - "SPEC_DISCOVERY", - "SPEC_NORMALIZATION", - "SPEC_VALIDATION", - "PLAN", - "TEST_RED", - "CODE_GREEN", - "QUALITY_GATE", - "REVIEW", - "SECURITY", - "DOCUMENT", - "COMPLETE", + PipelineState.REQUEST, + PipelineState.SPEC_DISCOVERY, + PipelineState.SPEC_NORMALIZATION, + PipelineState.SPEC_VALIDATION, + PipelineState.PLAN, + PipelineState.TEST_RED, + PipelineState.CODE_GREEN, + PipelineState.QUALITY_GATE, + PipelineState.REVIEW, + PipelineState.SECURITY, + PipelineState.DOCUMENT, + PipelineState.COMPLETE, ) @@ -127,36 +127,36 @@ def on_supervisor_decision( PIPELINE_STEPS: dict[str, PipelineStep] = { - "SPEC_VALIDATION": PipelineStep( - state="SPEC_VALIDATION", + PipelineState.SPEC_VALIDATION: PipelineStep( + state=PipelineState.SPEC_VALIDATION, description="Validate the feature SPEC before planning.", ), - "PLAN": PipelineStep( - state="PLAN", + PipelineState.PLAN: PipelineStep( + state=PipelineState.PLAN, description="Produce the planning hand-off for the current feature.", ), - "TEST_RED": PipelineStep( - state="TEST_RED", + PipelineState.TEST_RED: PipelineStep( + state=PipelineState.TEST_RED, description="Produce the failing test hand-off for the current feature.", ), - "CODE_GREEN": PipelineStep( - state="CODE_GREEN", + PipelineState.CODE_GREEN: PipelineStep( + state=PipelineState.CODE_GREEN, description="Produce the minimal implementation to satisfy the failing tests.", ), - "QUALITY_GATE": PipelineStep( - state="QUALITY_GATE", + PipelineState.QUALITY_GATE: PipelineStep( + state=PipelineState.QUALITY_GATE, description="Validate tests, lint, typecheck and regression before security review.", ), - "REVIEW": PipelineStep( - state="REVIEW", + PipelineState.REVIEW: PipelineStep( + state=PipelineState.REVIEW, description="Review the current delta and request rework when needed.", ), - "SECURITY": PipelineStep( - state="SECURITY", + PipelineState.SECURITY: PipelineStep( + state=PipelineState.SECURITY, description="Review security-sensitive aspects before reporting completion.", ), - "DOCUMENT": PipelineStep( - state="DOCUMENT", + PipelineState.DOCUMENT: PipelineStep( + state=PipelineState.DOCUMENT, description="Generate the final RUN_REPORT.md for the current run.", ), } @@ -217,38 +217,42 @@ def run( current_state = self.state_machine.current_state - if current_state in {"REQUEST", "SPEC_DISCOVERY", "SPEC_NORMALIZATION"}: + if current_state in { + PipelineState.REQUEST, + PipelineState.SPEC_DISCOVERY, + PipelineState.SPEC_NORMALIZATION, + }: self.state_machine.advance_to(self._next_state(current_state)) context.current_state = self.state_machine.current_state continue - if current_state == "COMPLETE": + if current_state == PipelineState.COMPLETE: context.current_state = current_state if self.observer is not None: self.observer.on_run_completed(context) return context - if current_state == "SPEC_VALIDATION": + if current_state == PipelineState.SPEC_VALIDATION: current_step = PIPELINE_STEPS[current_state] self._execute_spec_validation(context) if self.observer is not None: self.observer.on_step_completed(current_step, context, None) - if stop_at == "SPEC_VALIDATION": + if stop_at == PipelineState.SPEC_VALIDATION: if self.observer is not None: self.observer.on_run_completed(context) return context - self.state_machine.advance_to("PLAN") + self.state_machine.advance_to(PipelineState.PLAN) context.current_state = self.state_machine.current_state continue if current_state in { - "PLAN", - "TEST_RED", - "CODE_GREEN", - "QUALITY_GATE", - "REVIEW", - "SECURITY", - "DOCUMENT", + PipelineState.PLAN, + PipelineState.TEST_RED, + PipelineState.CODE_GREEN, + PipelineState.QUALITY_GATE, + PipelineState.REVIEW, + PipelineState.SECURITY, + PipelineState.DOCUMENT, }: current_step = PIPELINE_STEPS[current_state] result = self._run_runtime_step(current_step, context) @@ -280,8 +284,8 @@ def _execute_spec_validation(self, context: PipelineContext) -> None: context.validated_spec = spec_document context.artifacts["spec_id"] = spec_document.metadata.id context.artifacts["spec_summary"] = spec_document.metadata.summary - context.step_history.append("SPEC_VALIDATION") - context.current_state = "SPEC_VALIDATION" + context.step_history.append(PipelineState.SPEC_VALIDATION) + context.current_state = PipelineState.SPEC_VALIDATION def _execute_runtime_step( self, @@ -340,7 +344,7 @@ def _run_runtime_step( if decision.action == "return_to_code_green": context.step_history.append(step.state) context.current_state = step.state - self.state_machine.advance_to("CODE_GREEN") + self.state_machine.advance_to(PipelineState.CODE_GREEN) context.current_state = self.state_machine.current_state return None diff --git a/src/synapse_os/runtime/worker.py b/src/synapse_os/runtime/worker.py index 75d1d51..da91f85 100644 --- a/src/synapse_os/runtime/worker.py +++ b/src/synapse_os/runtime/worker.py @@ -11,6 +11,7 @@ RunRepository, ) from synapse_os.runtime.state import RuntimeState, RuntimeStateStore +from synapse_os.state_machine import PipelineState LEGACY_INITIATED_BY_VALUES = frozenset({"unknown", "system", "local_cli"}) RUNTIME_OWNER_SKIP_EVENT = "runtime_owner_skip" @@ -82,7 +83,7 @@ def _record_owner_skip_if_needed(self, run_record: RunRecord, *, runtime_owner: return self.repository.record_event( run_record.run_id, - state="REQUEST", + state=PipelineState.REQUEST, event_type=RUNTIME_OWNER_SKIP_EVENT, message=message, ) diff --git a/src/synapse_os/state_machine.py b/src/synapse_os/state_machine.py index f3bbdd1..66f1bed 100644 --- a/src/synapse_os/state_machine.py +++ b/src/synapse_os/state_machine.py @@ -1,58 +1,94 @@ from __future__ import annotations from dataclasses import dataclass, field +from enum import StrEnum class InvalidStateTransition(ValueError): pass -LINEAR_STATE_FLOW: tuple[str, ...] = ( - "REQUEST", - "SPEC_DISCOVERY", - "SPEC_NORMALIZATION", - "SPEC_VALIDATION", - "PLAN", - "TEST_RED", - "CODE_GREEN", - "QUALITY_GATE", - "REVIEW", - "SECURITY", - "DOCUMENT", - "COMPLETE", +class PipelineState(StrEnum): + REQUEST = "REQUEST" + SPEC_DISCOVERY = "SPEC_DISCOVERY" + SPEC_NORMALIZATION = "SPEC_NORMALIZATION" + SPEC_VALIDATION = "SPEC_VALIDATION" + PLAN = "PLAN" + TEST_RED = "TEST_RED" + CODE_GREEN = "CODE_GREEN" + QUALITY_GATE = "QUALITY_GATE" + REVIEW = "REVIEW" + SECURITY = "SECURITY" + DOCUMENT = "DOCUMENT" + COMPLETE = "COMPLETE" + FAILED = "FAILED" + CANCELLED = "CANCELLED" + + +LINEAR_STATE_FLOW: tuple[PipelineState, ...] = ( + PipelineState.REQUEST, + PipelineState.SPEC_DISCOVERY, + PipelineState.SPEC_NORMALIZATION, + PipelineState.SPEC_VALIDATION, + PipelineState.PLAN, + PipelineState.TEST_RED, + PipelineState.CODE_GREEN, + PipelineState.QUALITY_GATE, + PipelineState.REVIEW, + PipelineState.SECURITY, + PipelineState.DOCUMENT, + PipelineState.COMPLETE, ) -TERMINAL_STATES = {"COMPLETE", "FAILED"} +TERMINAL_STATES: frozenset[PipelineState] = frozenset({ + PipelineState.COMPLETE, + PipelineState.FAILED, + PipelineState.CANCELLED, +}) @dataclass class SynapseStateMachine: - current_state: str = "REQUEST" - _allowed_transitions: dict[str, set[str]] = field(init=False, repr=False) + current_state: PipelineState | str = PipelineState.REQUEST + _allowed_transitions: dict[PipelineState, set[PipelineState]] = field(init=False, repr=False) def __post_init__(self) -> None: + if isinstance(self.current_state, str): + self.current_state = PipelineState(self.current_state) self._allowed_transitions = _build_allowed_transitions() - def advance_to(self, next_state: str) -> None: - allowed_states = self._allowed_transitions.get(self.current_state, set()) - if next_state not in allowed_states: + def advance_to(self, next_state: PipelineState | str) -> None: + target = PipelineState(next_state) + # Ensure current_state is treated as PipelineState for dict lookup + current = PipelineState(self.current_state) + allowed_states = self._allowed_transitions.get(current, set()) + + if target not in allowed_states: raise InvalidStateTransition( - f"Cannot transition from {self.current_state} to {next_state}." + f"Cannot transition from {current} to {target}." ) - self.current_state = next_state + self.current_state = target def fail(self) -> None: - self.advance_to("FAILED") + self.advance_to(PipelineState.FAILED) + + def cancel(self) -> None: + self.advance_to(PipelineState.CANCELLED) -def _build_allowed_transitions() -> dict[str, set[str]]: - transitions: dict[str, set[str]] = {} +def _build_allowed_transitions() -> dict[PipelineState, set[PipelineState]]: + transitions: dict[PipelineState, set[PipelineState]] = {} for current_state, next_state in zip(LINEAR_STATE_FLOW, LINEAR_STATE_FLOW[1:], strict=False): - transitions[current_state] = {next_state, "FAILED"} + transitions[current_state] = {next_state, PipelineState.FAILED, PipelineState.CANCELLED} + + # Add specific loops + if PipelineState.REVIEW in transitions: + transitions[PipelineState.REVIEW].add(PipelineState.CODE_GREEN) + + # Terminal states have no transitions + for state in TERMINAL_STATES: + transitions[state] = set() - transitions["REVIEW"].add("CODE_GREEN") - transitions["COMPLETE"] = set() - transitions["FAILED"] = set() return transitions