Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions features/F50-state-machine-hardening/SPEC.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
id: F50-state-machine-hardening
type: feature
summary: "Formalizar estados do pipeline com Enums e definir estados terminais explícitos"
inputs:
- N/A (Refatoração interna)
outputs:
- Código usando PipelineState(StrEnum)
- Comportamento consistente em estados terminais
acceptance_criteria:
- Todos os estados do pipeline devem ser definidos em um StrEnum
- Literais de string soltos devem ser removidos do código core
- Estados terminais (COMPLETED, FAILED, CANCELLED) devem ser definidos em um set imutável
- O worker deve parar imediatamente ao encontrar um estado terminal
non_goals:
- Alterar a lógica de transição existente (apenas a representação dos estados)
- Adicionar novos estados
---

## Contexto
Atualmente, os estados do pipeline (ex: "PLAN", "TEST_RED") são strings literais espalhadas pelo código (`pipeline.py`, `state_machine.py`, `worker.py`). Isso é propenso a erros de digitação e dificulta a análise estática. Além disso, a definição de "estado terminal" está implícita em algumas verificações, mas não centralizada.

## Objetivo
1. Criar `class PipelineState(StrEnum)` em `src/aignt_os/state_machine.py`.
2. Definir `TERMINAL_STATES` como um conjunto de `PipelineState`.
3. Refatorar `PipelineEngine`, `RuntimeWorker` e testes para usar o Enum.
13 changes: 7 additions & 6 deletions src/synapse_os/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
StepExecutor,
)
from synapse_os.security import compute_file_sha256, resolve_path_within_root, sanitize_clean_text
from synapse_os.state_machine import PipelineState
from synapse_os.supervisor import Supervisor, SupervisorDecision

ARTIFACT_DIR_MODE = 0o700
Expand Down Expand Up @@ -670,7 +671,7 @@ def _artifacts_for_step(
if result is not None:
return dict(result.artifacts)

if step.state == "SPEC_VALIDATION":
if step.state == PipelineState.SPEC_VALIDATION:
return {
key: value
for key, value in context.artifacts.items()
Expand Down Expand Up @@ -698,7 +699,7 @@ def run(
self,
spec_path: Path,
*,
stop_at: str = "TEST_RED",
stop_at: str = PipelineState.TEST_RED,
initiated_by: str = "system",
spec_hash: str | None = None,
) -> PipelineContext:
Expand All @@ -714,7 +715,7 @@ def create_pending_run(
self,
spec_path: Path,
*,
stop_at: str = "TEST_RED",
stop_at: str = PipelineState.TEST_RED,
initiated_by: str = "system",
spec_hash: str | None = None,
) -> str:
Expand Down Expand Up @@ -747,7 +748,7 @@ def check_cancellation(self, _: PipelineContext) -> bool:

executors = dict(self.executors)
executors.setdefault(
"DOCUMENT",
PipelineState.DOCUMENT,
_RunReportStepExecutor(
repository=self.repository,
artifact_store=self.artifact_store,
Expand Down Expand Up @@ -779,14 +780,14 @@ def _create_pending_run_with_provenance(
)
run_id = self.repository.create_run(
spec_path=resolved_spec_path,
initial_state="REQUEST",
initial_state=PipelineState.REQUEST,
stop_at=stop_at,
spec_hash=persisted_spec_hash,
initiated_by=initiated_by,
)
self.repository.record_event(
run_id,
state="REQUEST",
state=PipelineState.REQUEST,
event_type="security_provenance_recorded",
message=(
f"Provenance recorded for initiated_by={initiated_by} "
Expand Down
108 changes: 56 additions & 52 deletions src/synapse_os/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from synapse_os.specs import (
SpecValidationError as _SpecValidationError,
)
from synapse_os.state_machine import LINEAR_STATE_FLOW, SynapseStateMachine
from synapse_os.state_machine import LINEAR_STATE_FLOW, PipelineState, SynapseStateMachine
from synapse_os.supervisor import (
RetryableStepError,
Supervisor,
Expand All @@ -22,28 +22,28 @@

PRIMARY_EXECUTOR_ROUTE = "primary"
PIPELINE_STOP_STATES = (
"SPEC_VALIDATION",
"PLAN",
"TEST_RED",
"CODE_GREEN",
"QUALITY_GATE",
"REVIEW",
"SECURITY",
"DOCUMENT",
PipelineState.SPEC_VALIDATION,
PipelineState.PLAN,
PipelineState.TEST_RED,
PipelineState.CODE_GREEN,
PipelineState.QUALITY_GATE,
PipelineState.REVIEW,
PipelineState.SECURITY,
PipelineState.DOCUMENT,
)
PIPELINE_ENTRY_STATES = (
"REQUEST",
"SPEC_DISCOVERY",
"SPEC_NORMALIZATION",
"SPEC_VALIDATION",
"PLAN",
"TEST_RED",
"CODE_GREEN",
"QUALITY_GATE",
"REVIEW",
"SECURITY",
"DOCUMENT",
"COMPLETE",
PipelineState.REQUEST,
PipelineState.SPEC_DISCOVERY,
PipelineState.SPEC_NORMALIZATION,
PipelineState.SPEC_VALIDATION,
PipelineState.PLAN,
PipelineState.TEST_RED,
PipelineState.CODE_GREEN,
PipelineState.QUALITY_GATE,
PipelineState.REVIEW,
PipelineState.SECURITY,
PipelineState.DOCUMENT,
PipelineState.COMPLETE,
)


Expand Down Expand Up @@ -127,36 +127,36 @@ def on_supervisor_decision(


PIPELINE_STEPS: dict[str, PipelineStep] = {
"SPEC_VALIDATION": PipelineStep(
state="SPEC_VALIDATION",
PipelineState.SPEC_VALIDATION: PipelineStep(
state=PipelineState.SPEC_VALIDATION,
description="Validate the feature SPEC before planning.",
),
"PLAN": PipelineStep(
state="PLAN",
PipelineState.PLAN: PipelineStep(
state=PipelineState.PLAN,
description="Produce the planning hand-off for the current feature.",
),
"TEST_RED": PipelineStep(
state="TEST_RED",
PipelineState.TEST_RED: PipelineStep(
state=PipelineState.TEST_RED,
description="Produce the failing test hand-off for the current feature.",
),
"CODE_GREEN": PipelineStep(
state="CODE_GREEN",
PipelineState.CODE_GREEN: PipelineStep(
state=PipelineState.CODE_GREEN,
description="Produce the minimal implementation to satisfy the failing tests.",
),
"QUALITY_GATE": PipelineStep(
state="QUALITY_GATE",
PipelineState.QUALITY_GATE: PipelineStep(
state=PipelineState.QUALITY_GATE,
description="Validate tests, lint, typecheck and regression before security review.",
),
"REVIEW": PipelineStep(
state="REVIEW",
PipelineState.REVIEW: PipelineStep(
state=PipelineState.REVIEW,
description="Review the current delta and request rework when needed.",
),
"SECURITY": PipelineStep(
state="SECURITY",
PipelineState.SECURITY: PipelineStep(
state=PipelineState.SECURITY,
description="Review security-sensitive aspects before reporting completion.",
),
"DOCUMENT": PipelineStep(
state="DOCUMENT",
PipelineState.DOCUMENT: PipelineStep(
state=PipelineState.DOCUMENT,
description="Generate the final RUN_REPORT.md for the current run.",
),
}
Expand Down Expand Up @@ -217,38 +217,42 @@ def run(

current_state = self.state_machine.current_state

if current_state in {"REQUEST", "SPEC_DISCOVERY", "SPEC_NORMALIZATION"}:
if current_state in {
PipelineState.REQUEST,
PipelineState.SPEC_DISCOVERY,
PipelineState.SPEC_NORMALIZATION,
}:
self.state_machine.advance_to(self._next_state(current_state))
context.current_state = self.state_machine.current_state
continue

if current_state == "COMPLETE":
if current_state == PipelineState.COMPLETE:
context.current_state = current_state
if self.observer is not None:
self.observer.on_run_completed(context)
return context

if current_state == "SPEC_VALIDATION":
if current_state == PipelineState.SPEC_VALIDATION:
current_step = PIPELINE_STEPS[current_state]
self._execute_spec_validation(context)
if self.observer is not None:
self.observer.on_step_completed(current_step, context, None)
if stop_at == "SPEC_VALIDATION":
if stop_at == PipelineState.SPEC_VALIDATION:
if self.observer is not None:
self.observer.on_run_completed(context)
return context
self.state_machine.advance_to("PLAN")
self.state_machine.advance_to(PipelineState.PLAN)
context.current_state = self.state_machine.current_state
continue

if current_state in {
"PLAN",
"TEST_RED",
"CODE_GREEN",
"QUALITY_GATE",
"REVIEW",
"SECURITY",
"DOCUMENT",
PipelineState.PLAN,
PipelineState.TEST_RED,
PipelineState.CODE_GREEN,
PipelineState.QUALITY_GATE,
PipelineState.REVIEW,
PipelineState.SECURITY,
PipelineState.DOCUMENT,
}:
current_step = PIPELINE_STEPS[current_state]
result = self._run_runtime_step(current_step, context)
Expand Down Expand Up @@ -280,8 +284,8 @@ def _execute_spec_validation(self, context: PipelineContext) -> None:
context.validated_spec = spec_document
context.artifacts["spec_id"] = spec_document.metadata.id
context.artifacts["spec_summary"] = spec_document.metadata.summary
context.step_history.append("SPEC_VALIDATION")
context.current_state = "SPEC_VALIDATION"
context.step_history.append(PipelineState.SPEC_VALIDATION)
context.current_state = PipelineState.SPEC_VALIDATION

def _execute_runtime_step(
self,
Expand Down Expand Up @@ -340,7 +344,7 @@ def _run_runtime_step(
if decision.action == "return_to_code_green":
context.step_history.append(step.state)
context.current_state = step.state
self.state_machine.advance_to("CODE_GREEN")
self.state_machine.advance_to(PipelineState.CODE_GREEN)
context.current_state = self.state_machine.current_state
return None

Expand Down
3 changes: 2 additions & 1 deletion src/synapse_os/runtime/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
RunRepository,
)
from synapse_os.runtime.state import RuntimeState, RuntimeStateStore
from synapse_os.state_machine import PipelineState

LEGACY_INITIATED_BY_VALUES = frozenset({"unknown", "system", "local_cli"})
RUNTIME_OWNER_SKIP_EVENT = "runtime_owner_skip"
Expand Down Expand Up @@ -82,7 +83,7 @@ def _record_owner_skip_if_needed(self, run_record: RunRecord, *, runtime_owner:
return
self.repository.record_event(
run_record.run_id,
state="REQUEST",
state=PipelineState.REQUEST,
event_type=RUNTIME_OWNER_SKIP_EVENT,
message=message,
)
Expand Down
Loading
Loading