Skip to content

Commit 2dc57ff

Browse files
committed
feat: fix stratify
1 parent b1873a1 commit 2dc57ff

6 files changed

Lines changed: 200 additions & 37 deletions

File tree

docs/optimizations.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ Union-find over node identity merges components when nodes list dependencies at
155155
- **Feedback effect perpetually dirty status (Phase 8.1/8.2, noted 2026-04-06 QA — documented, accepted):** The `__feedback_effect_<condition>` node forwards DIRTY via default dispatch but consumes DATA. The effect stays in "dirty" status between emissions. **By design** — the status is technically correct. Effects don't need to settle. These nodes now carry `meta._internal: True` and can be filtered from `describe()` output when undesired.
156156
- **Reduction primitives cross-language parity (Phase 8.1, noted 2026-04-06, QA 2026-04-06, updated 8.2 2026-04-06):** Both TS and PY implement `stratify`, `funnel`, `feedback`, `budget_gate`/`budgetGate`, and `scorer` in `patterns/reduction`. All five follow the orchestration factory pattern (`_base_meta`, `_register_step`). Key alignment: (1) `stratify` buffers DIRTY until DATA arrives — on classifier miss, emits `[DIRTY, RESOLVED]` to preserve spec §1.3.1 (both). (2) `funnel` now uses `bridge()` from `core/bridge` for inter-stage wiring — graph-visible effect nodes (`__bridge_<from>→<stage>_input`) replace the old subscribe-based bridges. Visible in `describe()`, participates in two-phase push. (3) `feedback` counter node is source of truth (resettable via `graph.set()`); feedback wiring now uses a graph-registered effect node (`__feedback_effect_<condition>`) instead of raw subscribe. Counter name is `__feedback_<condition>` to support multiple loops per graph. (4) `budget_gate`/`budgetGate` force-flushes all buffered items on terminal regardless of budget; sends RESUME before terminal if paused; forwards constraint ERROR downstream, silences constraint COMPLETE, forwards unknown constraint types via default. (5) `scorer` coerces `None`/`undefined` to 0 before multiplication (no TypeError/NaN divergence). TS `ScoredItem` is a plain object; PY `ScoredItem` is a class with `__slots__` and `__eq__`. Meta keys: `reduction: True`, `reduction_type: "<name>"`. Both repos: 22 tests each.
157157

158+
- **`stratify` two-dep gating (Phase 8.1/9.0, noted roadmap, resolved 2026-04-06, QA 2026-04-06):** `stratify` `on_message` previously intercepted only source messages (dep 0) and let rules messages (dep 1) fall through to default bitmask handling. When both source and rules updated in the same `batch()`, source DATA was classified against stale rules (rules DATA hadn't drained yet), and a spurious second emission occurred from default settlement of dep 1. Fix: `on_message` now intercepts **all** messages from both deps, implementing manual two-dep gating (same diamond-resolution pattern as `budget_gate`). Classification deferred until all dirty deps settle. Rules-only changes produce no downstream emission ("future items only"). QA hardening: (1) PAUSE/RESUME/INVALIDATE from rules dep swallowed (internal impl detail, not leaked downstream). (2) `classify` exceptions caught and treated as non-match (branch not killed). (3) `complete_when_deps_complete=False` explicit (branch lifecycle driven by source terminal, not auto-complete from both deps). Both TS and PY.
159+
158160
- **Cleanup wrapper `{"cleanup": fn, "value"?: v}` (all phases, noted 2026-04-06, resolved 2026-04-06):** Node fn can now return `{"cleanup": callable, "value": v}` to explicitly separate cleanup from data. When returned: `cleanup` is registered; if `"value"` key is present, it's emitted as data. Plain callable returns remain cleanup for backward compat. Exported as `CleanupResult<T>` (TS) / documented dict pattern (PY). Resolves the documented limitation where returning a callable as data was silently consumed as cleanup.
159161

160162
---

docs/roadmap.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ These fix protocol violations identified post-8.2 ship. Must land before §9.2 a
2626

2727
- [x] Rearchitect `feedback()` as graph-visible bridge node (8.2 → 9.0) — replaces subscribe-based shortcut; enables proper DIRTY→DATA two-phase on reentry/counter; resolves bare-DATA protocol gap
2828
- [x] Rearchitect `funnel()` bridges as graph-visible nodes (8.2 → 9.0) — replaces subscribe forwarding; resolves §5.9 imperative trigger violation + teardown leak
29-
- [ ] `stratify` two-dep gating (8.2 → 9.0) — gate classification on both source and rules settling (eliminates stale-rules race)
29+
- [x] `stratify` two-dep gating (8.2 → 9.0) — gate classification on both source and rules settling (eliminates stale-rules race)
3030

3131
### Wave 2 (Python): Audit & accountability parity (Weeks 4-9)
3232

@@ -555,7 +555,7 @@ Pre-wired graphs for common "info → action" domains. Users fork/extend.
555555
- [x] `data_quality_graph(opts)` → Graph — DB/API ingest → schema validation → anomaly detection → drift alerting → remediation suggestions
556556
- [x] Rearchitect `feedback()` as graph-visible bridge node (replaces subscribe-based shortcut; enables proper DIRTY→DATA two-phase on reentry/counter; resolves bare-DATA protocol gap)
557557
- [x] Rearchitect `funnel()` bridges as graph-visible nodes (replaces subscribe forwarding; resolves §5.9 imperative trigger violation + teardown leak)
558-
- [ ] `stratify` two-dep gating: gate classification on both source and rules settling (eliminates stale-rules race when both updated in same `batch()`)
558+
- [x] `stratify` two-dep gating: gate classification on both source and rules settling (eliminates stale-rules race when both updated in same `batch()`)
559559

560560
### 8.3 — LLM graph composition
561561

src/graphrefly/core/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
NodeImpl,
2727
NodeStatus,
2828
SubscribeHints,
29+
cleanup_result,
2930
node,
3031
)
3132
from graphrefly.core.protocol import (

src/graphrefly/core/node.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,25 @@ def _status_after_message(status: NodeStatus, msg: Message) -> NodeStatus:
106106
type Message = tuple[Any, Any] | tuple[Any]
107107

108108

109+
_CLEANUP_RESULT = "__graphrefly_cleanup_result__"
110+
"""Branded key that marks a :func:`cleanup_result` wrapper — prevents duck-type
111+
collisions with domain dicts that happen to have a ``cleanup`` key."""
112+
113+
114+
def cleanup_result(cleanup: Callable[[], None], value: Any = _SENTINEL) -> dict[str, Any]:
115+
"""Create a branded cleanup-result wrapper.
116+
117+
>>> node([dep], lambda vals, _: cleanup_result(release, computed))
118+
"""
119+
r: dict[str, Any] = {_CLEANUP_RESULT: True, "cleanup": cleanup}
120+
if value is not _SENTINEL:
121+
r["value"] = value
122+
return r
123+
124+
109125
def _is_cleanup_result(value: object) -> bool:
110-
"""Check for explicit cleanup wrapper: ``{"cleanup": fn}`` or ``{"cleanup": fn, "value": v}``."""
111-
return (
112-
isinstance(value, dict)
113-
and "cleanup" in value
114-
and callable(value["cleanup"])
115-
)
126+
"""Check for branded cleanup wrapper created by :func:`cleanup_result`."""
127+
return isinstance(value, dict) and value.get(_CLEANUP_RESULT) is True
116128

117129

118130
def _is_cleanup_fn(value: object) -> bool:

src/graphrefly/patterns/reduction.py

Lines changed: 79 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -151,55 +151,105 @@ def _add_branch(
151151
) -> None:
152152
"""Add a stratify branch to the graph.
153153
154+
Two-dep gating: intercepts messages from **both** source (dep 0) and rules
155+
(dep 1). Classification is deferred until all dirty deps have settled,
156+
eliminating the stale-rules race when both are updated in the same
157+
``batch()``.
158+
154159
Protocol: DIRTY is buffered until DATA arrives. If the classifier matches,
155160
emit [DIRTY, DATA]. If not, emit [DIRTY, RESOLVED] so downstream exits
156161
dirty status cleanly (spec §1.3.1). Source RESOLVED forwards as RESOLVED.
162+
Rules-only changes produce no downstream emission ("future items only").
157163
"""
158164
branch_name = f"branch/{rule.name}"
159-
pending_dirty = [False]
165+
_no_value = object()
166+
167+
# Per-branch two-dep gating state
168+
_source_dirty = [False]
169+
_rules_dirty = [False]
170+
_source_phase2 = [False] # source delivered DATA or RESOLVED this cycle
171+
_source_value: list[Any] = [_no_value] # DATA payload, or _no_value for RESOLVED
172+
_pending_dirty = [False] # owe downstream a DIRTY
173+
174+
def _resolve(actions: Any) -> None:
175+
if _source_phase2[0]:
176+
_source_phase2[0] = False
177+
value = _source_value[0]
178+
_source_value[0] = _no_value
179+
if value is not _no_value:
180+
# Source emitted DATA — classify with settled rules
181+
current_rules: list[StratifyRule] = rules_node.get() or []
182+
current_rule = next((r for r in current_rules if r.name == rule.name), None)
183+
try:
184+
matches = current_rule is not None and current_rule.classify(value)
185+
except Exception:
186+
matches = False
187+
if matches:
188+
_pending_dirty[0] = False
189+
actions.emit(value)
190+
else:
191+
if _pending_dirty[0]:
192+
_pending_dirty[0] = False
193+
actions.down([(MessageType.DIRTY,), (MessageType.RESOLVED,)])
194+
else:
195+
# Source RESOLVED (unchanged)
196+
if _pending_dirty[0]:
197+
_pending_dirty[0] = False
198+
actions.down([(MessageType.DIRTY,), (MessageType.RESOLVED,)])
199+
else:
200+
actions.down([(MessageType.RESOLVED,)])
201+
# else: rules-only change — no reclassification ("future items only")
160202

161203
def on_message(msg: Any, dep_index: int, actions: Any) -> bool:
162-
# Only intercept source messages (dep 0)
163-
if dep_index != 0:
164-
return False
165-
166204
t = msg[0]
167-
if t is MessageType.DATA:
168-
value = msg[1]
169-
current_rules: list[StratifyRule] = rules_node.get() or []
170-
current_rule = next((r for r in current_rules if r.name == rule.name), None)
171-
if current_rule is not None and current_rule.classify(value):
172-
# Match: emit (actions.emit handles DIRTY+DATA wrapping)
173-
pending_dirty[0] = False
174-
actions.emit(value)
175-
else:
176-
# No match: emit DIRTY + RESOLVED so downstream exits dirty
177-
if pending_dirty[0]:
178-
pending_dirty[0] = False
179-
actions.down([(MessageType.DIRTY,), (MessageType.RESOLVED,)])
180-
return True
205+
206+
# --- DIRTY (phase 1) ---
181207
if t is MessageType.DIRTY:
182-
pending_dirty[0] = True
208+
if dep_index == 0:
209+
_source_dirty[0] = True
210+
_pending_dirty[0] = True
211+
else:
212+
_rules_dirty[0] = True
183213
return True
184-
if t is MessageType.RESOLVED:
185-
# Source unchanged — forward RESOLVED (with buffered DIRTY if any)
186-
if pending_dirty[0]:
187-
pending_dirty[0] = False
188-
actions.down([(MessageType.DIRTY,), (MessageType.RESOLVED,)])
214+
215+
# --- Phase 2 (DATA / RESOLVED) ---
216+
if t is MessageType.DATA or t is MessageType.RESOLVED:
217+
if dep_index == 0:
218+
_source_dirty[0] = False
219+
_source_phase2[0] = True
220+
_source_value[0] = msg[1] if t is MessageType.DATA else _no_value
189221
else:
190-
actions.down([(MessageType.RESOLVED,)])
222+
_rules_dirty[0] = False
223+
224+
# Wait for all dirty deps to settle
225+
if _source_dirty[0] or _rules_dirty[0]:
226+
return True
227+
228+
_resolve(actions)
191229
return True
230+
231+
# --- Terminal ---
192232
if t in (MessageType.COMPLETE, MessageType.ERROR, MessageType.TEARDOWN):
193-
pending_dirty[0] = False
194-
actions.down([msg])
233+
_source_dirty[0] = False
234+
_rules_dirty[0] = False
235+
_source_phase2[0] = False
236+
_source_value[0] = _no_value
237+
_pending_dirty[0] = False
238+
if dep_index == 0:
239+
actions.down([msg])
240+
# Rules terminal: swallow (branch stays alive)
195241
return True
196-
return False
242+
243+
# Swallow PAUSE/RESUME/INVALIDATE from rules dep (internal impl detail);
244+
# forward unknown source messages via default handling.
245+
return dep_index == 1
197246

198247
filter_node = node(
199248
[source, rules_node],
200249
lambda _d, _a: None,
201250
on_message=on_message,
202251
describe_kind="operator",
252+
complete_when_deps_complete=False,
203253
meta=_base_meta("stratify_branch", {"branch": rule.name}),
204254
)
205255

tests/test_reduction.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,104 @@ def _sink(msgs: list) -> None:
112112
source.down([(MessageType.COMPLETE,)])
113113
assert completed[0] is True
114114

115+
def test_two_dep_gating_batch_source_and_rules(self) -> None:
116+
"""Stale-rules race: both source and rules update in same batch."""
117+
source = state("x")
118+
rules = [StratifyRule("match", classify=lambda v: v == "a")]
119+
120+
g = stratify("gate", source, rules)
121+
122+
seen: list[str] = []
123+
124+
def _sink(msgs: list) -> None:
125+
for msg in msgs:
126+
if msg[0] is MessageType.DATA:
127+
seen.append(msg[1])
128+
129+
g.resolve("branch/match").subscribe(_sink)
130+
131+
# Batch: update rules to match "b" AND send source "b" simultaneously.
132+
# Without two-dep gating, classification would use old rules (match "a")
133+
# and "b" would be dropped. With gating, both settle first.
134+
from graphrefly.core.protocol import batch
135+
136+
with batch():
137+
g.set("rules", [StratifyRule("match", classify=lambda v: v == "b")])
138+
source.down([(MessageType.DATA, "b")])
139+
140+
assert seen == ["b"]
141+
142+
def test_source_only_update_still_works(self) -> None:
143+
"""Regression: source-only updates must still classify correctly."""
144+
source = state(0)
145+
rules = [StratifyRule("pos", classify=lambda v: v > 0)]
146+
147+
g = stratify("src-only", source, rules)
148+
seen: list[int] = []
149+
150+
def _sink(msgs: list) -> None:
151+
for msg in msgs:
152+
if msg[0] is MessageType.DATA:
153+
seen.append(msg[1])
154+
155+
g.resolve("branch/pos").subscribe(_sink)
156+
157+
source.down([(MessageType.DATA, 5)])
158+
source.down([(MessageType.DATA, -1)])
159+
source.down([(MessageType.DATA, 3)])
160+
assert seen == [5, 3]
161+
162+
def test_rules_only_update_no_downstream(self) -> None:
163+
"""Rules-only change should not trigger reclassification."""
164+
source = state("a")
165+
rules = [StratifyRule("match", classify=lambda v: v == "a")]
166+
167+
g = stratify("rules-only", source, rules)
168+
169+
emissions: list[object] = []
170+
171+
def _sink(msgs: list) -> None:
172+
for msg in msgs:
173+
emissions.append(msg)
174+
175+
g.resolve("branch/match").subscribe(_sink)
176+
177+
# Initial source DATA
178+
source.down([(MessageType.DATA, "a")])
179+
initial_count = len(emissions)
180+
181+
# Change rules only — should NOT produce any new downstream messages
182+
g.set("rules", [StratifyRule("match", classify=lambda v: v == "b")])
183+
assert len(emissions) == initial_count
184+
185+
def test_both_settle_source_resolved(self) -> None:
186+
"""Source DIRTY→RESOLVED + rules DATA in same batch → RESOLVED downstream."""
187+
source = state("x")
188+
rules = [StratifyRule("match", classify=lambda v: v == "x")]
189+
190+
g = stratify("resolved", source, rules)
191+
192+
data_seen: list[object] = []
193+
resolved_count = [0]
194+
195+
def _sink(msgs: list) -> None:
196+
for msg in msgs:
197+
if msg[0] is MessageType.DATA:
198+
data_seen.append(msg[1])
199+
elif msg[0] is MessageType.RESOLVED:
200+
resolved_count[0] += 1
201+
202+
g.resolve("branch/match").subscribe(_sink)
203+
204+
# Initial emission
205+
source.down([(MessageType.DATA, "x")])
206+
assert data_seen == ["x"]
207+
208+
# Now update rules in isolation — source not involved
209+
g.set("rules", [StratifyRule("match", classify=lambda v: v == "y")])
210+
# No new data should appear
211+
assert data_seen == ["x"]
212+
115213

116214
# ---------------------------------------------------------------------------
117215
# funnel

0 commit comments

Comments
 (0)