Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
775 changes: 761 additions & 14 deletions apps/api/openapi.json

Large diffs are not rendered by default.

36 changes: 36 additions & 0 deletions apps/api/src/cora/infrastructure/canonical_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Single-source canonical JSON encoder for deterministic content hashing.

Stable byte output for the same logical value: sorted keys, no
whitespace, UTF-8 encoded. Per [[project-run-procedure-replay-design]]
both write-time hashing (decider) and replay-time hashing (handler)
call this helper so recorded content-address pins reproduce across
processes. Lives in infrastructure because the aggregates layer
(which produces canonical bytes for event payload persistence) cannot
import from BC-local helper modules; infrastructure is the lowest
common denominator across `cora.operation.aggregates` + handlers + the
shared `_recipe_expansion` helper.

The architecture fitness in tests/architecture restricts
`json.dumps(sort_keys=True)` co-occurrence in the `cora.operation` and
`cora.recipe` trees to the few sites that re-export this helper.
Callers needing a dict-typed JSON value for persistence wrap as
`json.loads(canonical_json_bytes(...))`; the wrapper stays inline at
each call site rather than being hoisted so non-persisting callers do
not pay a parse-then-stringify roundtrip. See replay-design Anti-hook 18.
"""

import json


def canonical_json_bytes(value: object) -> bytes:
"""Encode `value` as canonical JSON bytes.

Equivalent to `json.dumps(value, sort_keys=True, separators=(",", ":")).encode("utf-8")`.
Use this helper everywhere a deterministic byte representation is
needed for hashing or content-addressed storage in the operation +
recipe BC trees.
"""
return json.dumps(value, sort_keys=True, separators=(",", ":")).encode("utf-8")


__all__ = ["canonical_json_bytes"]
139 changes: 139 additions & 0 deletions apps/api/src/cora/operation/_recipe_expansion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Pure `expand` for Recipe step tuples -> Conductor `Step` lists.

Cross-BC bridge: the Recipe BC's `RecipeStep` union + `BindingRef`
sentinel describe parameterized scan recipes; the Operation BC's `Step`
union (`SetpointStep | ActionStep | CheckStep`) is what the Conductor
walks. The direction Operation -> Recipe is the allowed dependency
edge (tach-enforced), so this expansion bridge lives here.

Per [[project-recipe-aggregate-design]] the expansion contract is
pure: no clock, no port I/O, no randomness, no module-global state.
Same inputs `(steps, bindings)` yield identical outputs. The
`register_procedure_from_recipe` slice re-runs `expand` once at
validation time and compares results to enforce determinism via the
`RecipeExpansionDeterminismError` rejection.
"""

from collections.abc import Mapping
from typing import Any

from cora.infrastructure.canonical_json import canonical_json_bytes
from cora.operation.conductor import (
ActionStep,
CheckStep,
EqualsCriterion,
SetpointStep,
Step,
WithinToleranceCriterion,
)
from cora.recipe.aggregates.recipe import (
RecipeActionStep,
RecipeSetpointStep,
RecipeStep,
)
from cora.recipe.aggregates.recipe.body import resolve_value


def _criterion_from_wire(
payload: Mapping[str, Any],
) -> EqualsCriterion | WithinToleranceCriterion:
"""Translate a `RecipeCheckStep.criterion` wire dict to the typed union.

Mirrors the Conductor's `_criterion_to_dict` serialization shape
arm-for-arm. Extension: a new criterion kind lands in three places:
the Conductor's `_criterion_to_dict` / `_criterion_matches` arms,
this function's arms, and the matching test in
`test_recipe_step_variants_match_step_union`.
"""
kind = payload["kind"]
if kind == "equals":
return EqualsCriterion(expected=payload["expected"])
if kind == "within_tolerance":
return WithinToleranceCriterion(
expected=payload["expected"], tolerance=payload["tolerance"]
)
msg = f"unknown criterion kind: {kind!r}"
raise ValueError(msg)


def _expand_step(step: RecipeStep, bindings: Mapping[str, Any]) -> Step:
"""Expand one recipe step into a concrete `Step` per the union arm."""
if isinstance(step, RecipeSetpointStep):
return SetpointStep(
address=step.address,
value=resolve_value(step.value, bindings),
verify=step.verify,
)
if isinstance(step, RecipeActionStep):
return ActionStep(
name=step.name,
params={key: resolve_value(val, bindings) for key, val in step.params.items()},
)
# RecipeCheckStep: criterion is a wire-format dict (kept dict-shaped
# in Recipe BC to avoid an Operation -> Recipe import).
return CheckStep(
address=step.address,
criterion=_criterion_from_wire(step.criterion),
)


def expand(steps: tuple[RecipeStep, ...], bindings: Mapping[str, Any]) -> tuple[Step, ...]:
"""Expand `steps` against `bindings` to a flat tuple of Conductor `Step`s.

Pure function: same inputs yield identical outputs. Order of `steps`
is preserved.

Raises `UnboundRecipeBindingError` (from `cora.recipe.aggregates.recipe`)
if any `BindingRef.name` in `steps` is missing from `bindings`. Raises
`ValueError` for unknown criterion kinds in a `RecipeCheckStep`. Extra
bindings (keys in `bindings` that no `BindingRef` references) are
silently ignored.
"""
return tuple(_expand_step(step, bindings) for step in steps)


def _criterion_to_wire(
criterion: EqualsCriterion | WithinToleranceCriterion,
) -> dict[str, Any]:
"""Mirrors `_criterion_from_wire`: typed -> wire dict."""
if isinstance(criterion, EqualsCriterion):
return {"kind": "equals", "expected": criterion.expected}
return {
"kind": "within_tolerance",
"expected": criterion.expected,
"tolerance": criterion.tolerance,
}


def _step_to_wire(step: Step) -> dict[str, Any]:
if isinstance(step, SetpointStep):
return {
"kind": "setpoint",
"address": step.address,
"value": step.value,
"verify": step.verify,
}
if isinstance(step, ActionStep):
return {
"kind": "action",
"name": step.name,
"params": dict(step.params),
}
return {
"kind": "check",
"address": step.address,
"criterion": _criterion_to_wire(step.criterion),
}


def steps_to_wire(steps: tuple[Step, ...]) -> list[dict[str, Any]]:
"""Canonical list-of-dicts for hashing or persisting expanded Steps.

Downstream re-expansion (run-time replay) reuses this serializer
to recompute `steps_hash` from a freshly-expanded Recipe and
confirm it matches the `RecipeExpansionRecorded.steps_hash` pin.
"""
return [_step_to_wire(step) for step in steps]


__all__ = ["canonical_json_bytes", "expand", "steps_to_wire"]
148 changes: 148 additions & 0 deletions apps/api/src/cora/operation/_recipe_replay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""Recipe-expansion replay helpers for the `conduct_procedure` handler.

Per [[project-run-procedure-replay-design]] the run-time replay path
locates the genesis `RecipeExpansionRecorded` provenance event in a
Procedure stream, extracts the pinned hash + bindings + port-version
tuple, then verifies a freshly-re-expanded `tuple[Step, ...]` matches
the recorded pins. This module collects the pure helpers; the handler
threads them after authz + Procedure load.

This is the FIRST handler-tier site in CORA that reads
`StoredEvent.payload` directly outside a projection. Per replay-design
§Locks the rule-of-three threshold gates promoting the helper to a
shared module: when a SECOND handler (any BC) needs payload-direct
access, hoist `find_recipe_expansion_record` to a generic
`cora.infrastructure.event_payload` helper. For comparison, projections
also read `.payload` but at projection-fold time, not at
handler-orchestration time. See replay-design Anti-hook 12.
"""

import hashlib
from collections.abc import Iterable, Mapping
from dataclasses import dataclass
from typing import Any, Literal
from uuid import UUID

from cora.infrastructure.canonical_json import canonical_json_bytes
from cora.infrastructure.ports.event_store import StoredEvent
from cora.operation._recipe_expansion import steps_to_wire
from cora.operation.aggregates.procedure import (
RecipeExpansionRecordNotFoundError,
RecipeExpansionReplayMismatchError,
)
from cora.operation.conductor import Step


@dataclass(frozen=True)
class RecipeExpansionPins:
"""The replay-pinned subset of a `RecipeExpansionRecorded` payload.

Constructed by `pins_from_payload`. Carries only the fields the
replay path needs (control flow), NOT the audit-only fields
(procedure_id, recipe_id, capability_id, capability_version,
step_count, occurred_at) which are read directly at the handler
entry for logging.
"""

recipe_version: str | None
bindings: Mapping[str, Any]
bindings_hash: str
steps_hash: str
expansion_port_version: str


def find_recipe_expansion_record(
stored_events: Iterable[StoredEvent],
) -> StoredEvent | None:
"""Locate the `RecipeExpansionRecorded` event in a Procedure stream.

Scans linearly from head, returns the first match, early-exits on
first hit. In well-formed Recipe-driven Procedure streams the match
lands at index 1 (the second event in the genesis 2-event block
emitted by `register_procedure_from_recipe`); the unit test pins
this position invariant. Tail-scan is wrong: only the genesis
`RecipeExpansionRecorded` defines the replay snapshot.

Returns `None` when no match. The caller decides whether None is
expected (legacy Procedure with `recipe_id is None`) or an error
(recipe-driven Procedure missing its provenance event, raised as
`RecipeExpansionRecordNotFoundError` by the handler).
"""
for event in stored_events:
if event.event_type == "RecipeExpansionRecorded":
return event
return None


_REQUIRED_PINS_KEYS = (
"bindings",
"bindings_hash",
"expansion_port_version",
"steps_hash",
)


def pins_from_payload(procedure_id: UUID, payload: Mapping[str, Any]) -> RecipeExpansionPins:
"""Extract the replay-pinned subset from a `RecipeExpansionRecorded` payload.

Defensive: raises `RecipeExpansionRecordNotFoundError(procedure_id)`
if any required key is missing (covers the corrupt-payload case
distinct from missing-event case; both surface the same error
family per the replay-design lock on triage simplicity).
"""
missing = [key for key in _REQUIRED_PINS_KEYS if key not in payload]
if missing:
raise RecipeExpansionRecordNotFoundError(procedure_id)
return RecipeExpansionPins(
recipe_version=payload.get("recipe_version"),
bindings=dict(payload["bindings"]),
bindings_hash=payload["bindings_hash"],
steps_hash=payload["steps_hash"],
expansion_port_version=payload["expansion_port_version"],
)


def verify_bindings_hash(procedure_id: UUID, pins: RecipeExpansionPins) -> None:
"""Verify the recorded `bindings` payload still hashes to `bindings_hash`.

Raises `RecipeExpansionReplayMismatchError(procedure_id, "bindings")`
on mismatch. Bindings drift is input drift (the recorded payload
no longer canonicalizes to its recorded hash, i.e. payload
corruption); failing it BEFORE the steps check isolates the failure
mode in the discriminator value, easier to triage than a downstream
steps mismatch caused by upstream binding corruption.
"""
recomputed = hashlib.sha256(canonical_json_bytes(dict(pins.bindings))).hexdigest()
if recomputed != pins.bindings_hash:
raise RecipeExpansionReplayMismatchError(procedure_id, "bindings")


def verify_steps_hash(
procedure_id: UUID,
steps: tuple[Step, ...],
pins: RecipeExpansionPins,
) -> None:
"""Verify the re-expanded steps still hash to the recorded `steps_hash`.

Raises `RecipeExpansionReplayMismatchError(procedure_id, "steps")`
on mismatch. Steps drift is expansion-logic drift (the port
produces different output for the same input than at write time);
runs AFTER `verify_bindings_hash` because steps drift downstream
of bindings is a confusing diagnostic.
"""
recomputed = hashlib.sha256(canonical_json_bytes(steps_to_wire(steps))).hexdigest()
if recomputed != pins.steps_hash:
raise RecipeExpansionReplayMismatchError(procedure_id, "steps")


MismatchField = Literal["bindings", "steps"]


__all__ = [
"MismatchField",
"RecipeExpansionPins",
"find_recipe_expansion_record",
"pins_from_payload",
"verify_bindings_hash",
"verify_steps_hash",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Default `RecipeExpansionPort` adapter: pure delegation to `expand`.

Wraps the module-level `cora.operation._recipe_expansion.expand` function
in a Protocol-conforming object and pins a stable `version` string. The
version is recorded in `RecipeExpansionRecorded` provenance events so
replay can verify which expander produced a given step sequence.

A new expander version is a code change here: bump `version` to the
next stable tag when expansion semantics change in a way that affects
already-recorded provenance events.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from collections.abc import Mapping

from cora.operation.conductor import Step
from cora.recipe.aggregates.recipe import RecipeStep

_DEFAULT_VERSION = "v1"


@dataclass(frozen=True)
class InMemoryRecipeExpansionPort:
"""Pure-function `RecipeExpansionPort` backed by the default `expand`.

`version` defaults to `"v1"` and is rarely overridden in production;
tests pass a different version when they need to assert provenance
carries the expander identity.
"""

version: str = _DEFAULT_VERSION

def expand(
self, steps: tuple[RecipeStep, ...], bindings: Mapping[str, Any]
) -> tuple[Step, ...]:
from cora.operation._recipe_expansion import expand as _expand

return _expand(steps, bindings)


__all__ = ["InMemoryRecipeExpansionPort"]
Loading
Loading