Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions docs/policy-fabric-hook.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,45 @@ Decision fields:
"reason": "secure_lane_requires_explicit_grant",
"subject": "sourceos-syncd",
"object_id": null,
"data_class": "internal"
"data_class": "internal",
"decision_boundary": {
"decision_scope": "policy-only",
"runtime_effect_performed": false,
"authority_mutation_performed": false,
"state_repair_performed": false,
"ledger_write_performed": false,
"downstream_refs": [
"SourceOS-Linux/sourceos-spec#113",
"SourceOS-Linux/sourceos-syncd#30"
]
}
}
```

## Boundary invariant

The local hook emits policy decisions only.

It does not perform:

- runtime effects;
- agent grant or authority mutation;
- state repair;
- ledger writes;
- replication;
- bridge export;
- memory writeback.

The hard chain is:

```text
state observation/report input = evidence
policy decision = local or remote policy evaluation
runtime effect = separate admission/effect decision
authority/grant mutation = separate Agent Registry / grant-state decision
state integrity report = ledger/report evidence only
```

## Local default behavior

The local stub is conservative:
Expand Down Expand Up @@ -85,7 +120,7 @@ Store-backed reports now include local policy evaluation output:
}
```

The top-level `policy.policy_decisions` field remains numeric and dashboard-friendly. Detailed sample decisions live under `diagnosis.policy` so the State Integrity Report top-level contract remains closed.
The top-level `policy.policy_decisions` field remains numeric and dashboard-friendly. Detailed sample decisions live under `diagnosis.policy` so the State Integrity Report top-level contract remains closed. Every sample decision carries `decision_boundary` so report readers can distinguish policy evaluation from runtime action.

## Intended PolicyFabric replacement path

Expand All @@ -94,14 +129,21 @@ The local stub should later be replaced by a PolicyFabric client that preserves
1. Build `PolicyRequest` from lane, action, subject, object, and data class.
2. Send request to PolicyFabric.
3. Receive `sourceos.policy-decision/v1alpha1` response.
4. Count statuses into `policy.policy_decisions`.
5. Attach representative explanations under `diagnosis.policy.sample`.
6. Preserve sensitive object details through redaction, not omission.
4. Confirm `decision_boundary.decision_scope = policy-only`.
5. Count statuses into `policy.policy_decisions`.
6. Attach representative explanations under `diagnosis.policy.sample`.
7. Preserve sensitive object details through redaction, not omission.

## Estate expectations

- PolicyFabric owns final policy semantics.
- SourceOS runtime-effect decisions are separate from policy decisions.
- Agent Registry / grant-state decisions are separate from policy decisions.
- AgentPlane consumes policy decisions before agent action.
- Lampstand preserves significant policy decisions as evidence.
- Delivery Excellence scores policy friction and redaction rates.
- Secure and repair lanes fail closed until explicit grants exist.

## Validation

The unit tests reject policy decisions that claim to perform runtime effects or authority mutation. This prevents the local hook from becoming a hidden execution or grant-state surface.
58 changes: 57 additions & 1 deletion src/sourceos_syncd/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
The real PolicyFabric service will eventually own policy evaluation. This module
provides a local, deterministic contract-compatible evaluator so State Integrity
Reports can already carry policy counts and explanation codes.

Boundary invariant: this module emits policy decisions only. It does not perform
runtime effects, mutate agent grants, repair state, write ledgers, or replicate
payloads. Downstream systems must consume explicit decision refs before taking
any action.
"""

from __future__ import annotations
Expand All @@ -15,6 +20,7 @@

POLICY_DECISION_SCHEMA = "sourceos.policy-decision/v1alpha1"
POLICY_ENGINE = "policy-fabric-local-stub"
DECISION_SCOPE = "policy-only"

ACTIONS = {
"index",
Expand Down Expand Up @@ -44,6 +50,31 @@ class PolicyRequest:
context: dict[str, Any] = field(default_factory=dict)


@dataclass(frozen=True)
class DecisionBoundary:
"""Hard boundary carried by local policy decisions."""

decision_scope: str = DECISION_SCOPE
runtime_effect_performed: bool = False
authority_mutation_performed: bool = False
state_repair_performed: bool = False
ledger_write_performed: bool = False
downstream_refs: tuple[str, ...] = (
"SourceOS-Linux/sourceos-spec#113",
"SourceOS-Linux/sourceos-syncd#30",
)

def as_dict(self) -> dict[str, Any]:
return {
"decision_scope": self.decision_scope,
"runtime_effect_performed": self.runtime_effect_performed,
"authority_mutation_performed": self.authority_mutation_performed,
"state_repair_performed": self.state_repair_performed,
"ledger_write_performed": self.ledger_write_performed,
"downstream_refs": list(self.downstream_refs),
}


@dataclass(frozen=True)
class PolicyDecision:
decision_id: str
Expand All @@ -57,6 +88,7 @@ class PolicyDecision:
engine: str = POLICY_ENGINE
schema: str = POLICY_DECISION_SCHEMA
generated_at: str = field(default_factory=utc_now)
boundary: DecisionBoundary = field(default_factory=DecisionBoundary)

def as_dict(self) -> dict[str, Any]:
return {
Expand All @@ -71,9 +103,28 @@ def as_dict(self) -> dict[str, Any]:
"subject": self.subject,
"object_id": self.object_id,
"data_class": self.data_class,
"decision_boundary": self.boundary.as_dict(),
}


def validate_decision_boundary(decision: dict[str, Any]) -> None:
"""Reject collapsed policy→runtime/authority/state records."""

boundary = decision.get("decision_boundary")
if not isinstance(boundary, dict):
raise ValueError("policy decision missing decision_boundary")
if boundary.get("decision_scope") != DECISION_SCOPE:
raise ValueError("policy decision scope must be policy-only")
for key in (
"runtime_effect_performed",
"authority_mutation_performed",
"state_repair_performed",
"ledger_write_performed",
):
if boundary.get(key) is not False:
raise ValueError(f"policy decision must not perform {key}")


def _decision_id(request: PolicyRequest, status: str, reason: str) -> str:
payload = {
"action": request.action,
Expand Down Expand Up @@ -131,13 +182,16 @@ def evaluate_report_policy(lanes: list[dict[str, Any]], subject: str = "sourceos
lane_name = str(lane.get("name", "normal"))
for action in ("index", "retain", "replicate", "agent_access"):
decision = evaluate_policy(PolicyRequest(action=action, lane=lane_name, subject=subject))
decisions.append(decision.as_dict())
payload = decision.as_dict()
validate_decision_boundary(payload)
decisions.append(payload)
return decisions


def decision_counts(decisions: list[dict[str, Any]]) -> dict[str, int]:
counts = {status: 0 for status in sorted(STATUSES)}
for decision in decisions:
validate_decision_boundary(decision)
status = str(decision.get("status", "deferred"))
if status not in counts:
status = "deferred"
Expand All @@ -146,6 +200,8 @@ def decision_counts(decisions: list[dict[str, Any]]) -> dict[str, int]:


def policy_summary(decisions: list[dict[str, Any]], sample_limit: int = 12) -> dict[str, Any]:
for decision in decisions:
validate_decision_boundary(decision)
return {
"engine": POLICY_ENGINE,
"counts": decision_counts(decisions),
Expand Down
35 changes: 34 additions & 1 deletion tests/test_policy_hook.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
from __future__ import annotations

from sourceos_syncd.policy import PolicyRequest, decision_counts, evaluate_policy, evaluate_report_policy
import pytest

from sourceos_syncd.policy import (
PolicyRequest,
decision_counts,
evaluate_policy,
evaluate_report_policy,
validate_decision_boundary,
)
from sourceos_syncd.store_reports import init_store, snapshot_from_store


def test_secure_lane_agent_access_is_denied():
decision = evaluate_policy(PolicyRequest(action="agent_access", lane="secure"))
assert decision.status == "denied"
assert decision.reason == "secure_lane_requires_explicit_grant"
payload = decision.as_dict()
assert payload["decision_boundary"]["decision_scope"] == "policy-only"
assert payload["decision_boundary"]["runtime_effect_performed"] is False
assert payload["decision_boundary"]["authority_mutation_performed"] is False


def test_secure_lane_indexing_is_redacted():
Expand All @@ -22,6 +34,20 @@ def test_unknown_action_is_deferred():
assert decision.reason == "unknown_action"


def test_policy_decision_rejects_collapsed_runtime_effect():
decision = evaluate_policy(PolicyRequest(action="replicate", lane="normal")).as_dict()
decision["decision_boundary"]["runtime_effect_performed"] = True
with pytest.raises(ValueError, match="runtime_effect_performed"):
validate_decision_boundary(decision)


def test_policy_decision_rejects_collapsed_authority_mutation():
decision = evaluate_policy(PolicyRequest(action="agent_access", lane="secure")).as_dict()
decision["decision_boundary"]["authority_mutation_performed"] = True
with pytest.raises(ValueError, match="authority_mutation_performed"):
validate_decision_boundary(decision)


def test_report_policy_counts_include_all_statuses():
lanes = [{"name": "normal"}, {"name": "secure"}, {"name": "repair"}, {"name": "ephemeral"}]
decisions = evaluate_report_policy(lanes)
Expand All @@ -30,6 +56,10 @@ def test_report_policy_counts_include_all_statuses():
assert counts["denied"] > 0
assert counts["redacted"] > 0
assert counts["deferred"] > 0
for decision in decisions:
assert decision["decision_boundary"]["decision_scope"] == "policy-only"
assert decision["decision_boundary"]["state_repair_performed"] is False
assert decision["decision_boundary"]["ledger_write_performed"] is False


def test_store_backed_snapshot_includes_policy_summary(tmp_path):
Expand All @@ -39,3 +69,6 @@ def test_store_backed_snapshot_includes_policy_summary(tmp_path):
assert report["policy"]["policy_version"] == "v0.1.0-local-stub"
assert report["diagnosis"]["policy"]["engine"] == "policy-fabric-local-stub"
assert report["diagnosis"]["policy"]["counts"]["allowed"] > 0
sample = report["diagnosis"]["policy"]["sample"]
assert sample
assert sample[0]["decision_boundary"]["decision_scope"] == "policy-only"
Loading