refactor(harness): drive ui::approval events from agent::events stream#136
refactor(harness): drive ui::approval events from agent::events stream#136ytallo wants to merge 1 commit into
Conversation
The approval-gate already writes approval_requested / approval_resolved
frames to the agent::events stream, and the fanout already subscribes to
that stream. The 1s approval poll was duplicate machinery layered on top.
Forward approval frames from the existing stream subscriber to
all-sessions browsers as ui::approval::{requested,resolved}::<browser_id>.
Hydrate pending approvals once per new all-sessions subscriber by calling
approval::list_pending at subscribe time instead of on a timer.
Removes: spawn_approval_poll, APPROVAL_POLL_INTERVAL_MS, diff_approvals,
and FanoutPumps.approval_poll. approval::list_pending stays as the
hydration RPC.
📝 WalkthroughWalkthroughThis PR replaces approval UI fanout from periodic polling to a reactive event-driven pipeline. The approval polling pump and diff logic are removed, replaced with reactive classification of ChangesApproval fanout reactive migration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
skill-check — worker0 verified, 25 skipped (no docs/).
Three for three. Nicely done. |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (5)
harness/src/fanout.rs (4)
522-540: 💤 Low value
hydration_payloadssilently overwrites any incomingtypefield on the entry.Today
approval::list_pendingdoesn't put atypeon entries, so this is a no-op. But the function inserts"type": "approval_requested"unconditionally to feedclassify_approval_frame, so iflist_pending's response ever grows atypefield (e.g. some future"type": "approval_resolved"straggler), this code will rewrite it before classifying. Thestatus=="pending"pre-filter mostly saves you, but it's worth a sentence in the doc comment so a future reader doesn't wonder why we'd ever stamptypeover a real value.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@harness/src/fanout.rs` around lines 522 - 540, hydration_payloads currently unconditionally overwrites any existing "type" field on each pending entry before calling classify_approval_frame; change the logic inside hydration_payloads so it only inserts "type": "approval_requested" when the entry does not already have a "type" key (i.e., check obj.contains_key("type") first), preserve the rest of the flow/classification with classify_approval_frame, and add a brief doc comment above hydration_payloads explaining that we only stamp a default type when missing to avoid clobbering upstream-provided types such as "approval_resolved".
8-19: 💤 Low valueModule doc comment is stale: reactive approval forwarding isn't mentioned.
Item 1 still describes only
ui::session::event::<browser_id>forwarding, but the same agent::events subscriber now also classifies approval frames and pushesui::approval::{requested,resolved}::<browser_id>. While you're here, item 2 also lists only the sessions-changed poll — the file actually wires cost/workers polls and the iii-directory on-change pumps too. Worth a one-paragraph refresh so new readers don't trust the count.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@harness/src/fanout.rs` around lines 8 - 19, The module doc comment is out of date: update the header paragraph to list all upstream pumps and what they do — mention the agent::events stream subscriber now also classifies approval frames and pushes ui::approval::{requested,resolved}::<browser_id> in addition to ui::session::event::<browser_id>, and extend the second item to enumerate the sessions-changed poll plus the cost/workers polls and the iii-directory on-change pumps that are wired in this file; keep the description concise and accurate so readers see the full set of pumps and their high-level responsibilities (agent::events handler, sessions-changed poll, cost/workers polls, iii-directory on-change).
567-583: 💤 Low valueHydration fans
approval::list_pendingsequentially across all sessions.For each session this is a wire RTT under a 5s timeout, so a host with N active sessions makes the late-joiner wait up to N × RTT before the first synthesized
requestedlands.futures::future::join_all(ortry_join_all) over the per-session calls would cut this to a single RTT without changing semantics. Not a blocker — hydration is fire-and-forget — but a cheap win on systems with many sessions.♻️ Sketch
- let mut per_session: Vec<(String, Vec<Value>)> = Vec::with_capacity(sessions.len()); - for sid in &sessions { - let resp = iii - .trigger(TriggerRequest { - function_id: "approval::list_pending".into(), - payload: json!({ "session_id": sid }), - action: None, - timeout_ms: Some(STATE_LIST_TIMEOUT_MS), - }) - .await; - let entries = resp - .ok() - .and_then(|v| v.get("pending").and_then(|p| p.as_array()).cloned()) - .unwrap_or_default(); - if !entries.is_empty() { - per_session.push((sid.clone(), entries)); - } - } + let calls = sessions.iter().map(|sid| { + let iii = Arc::clone(&iii); + let sid = sid.clone(); + async move { + let resp = iii + .trigger(TriggerRequest { + function_id: "approval::list_pending".into(), + payload: json!({ "session_id": sid }), + action: None, + timeout_ms: Some(STATE_LIST_TIMEOUT_MS), + }) + .await; + let entries = resp + .ok() + .and_then(|v| v.get("pending").and_then(|p| p.as_array()).cloned()) + .unwrap_or_default(); + (sid, entries) + } + }); + let per_session: Vec<(String, Vec<Value>)> = futures::future::join_all(calls) + .await + .into_iter() + .filter(|(_, entries)| !entries.is_empty()) + .collect();🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@harness/src/fanout.rs` around lines 567 - 583, The loop that calls iii.trigger(TriggerRequest { function_id: "approval::list_pending", ... }) for each sid runs sequentially and causes N×RTT latency; change it to collect each trigger future (referencing sessions, iii.trigger, TriggerRequest, STATE_LIST_TIMEOUT_MS) into a Vec of futures and use futures::future::join_all (or try_join_all) to await them concurrently, then iterate the joined results to extract "pending" arrays and push non-empty (sid.clone(), entries) into per_session preserving the same extraction logic (ok().and_then(...).unwrap_or_default()) so behavior is unchanged but all RPCs happen in parallel.
384-400: ⚡ Quick winReactive approval pushes bypass backpressure and stale-browser GC.
Two consistency gaps versus the rest of the file:
- Unlike
push_to_browser(used by cost/workers/hydration paths), thesetokio::spawn-ed triggers don't count againstPER_BROWSER_QUEUE_CAPor arm theui::session::resyncdeduper. A fast burst ofapproval_*frames to a slow browser can run unbounded in-flight, undercutting the per-browser cap guarantee.- Unlike the per-session forward immediately below (lines 432-446), there's no
is_function_not_found→evict_browserpath. An all-sessions browser that closed withoutui::unsubscribeis only GC'd when it happens to also be subscribed to a specific session; pure all-sessions clients will silently logfunction_not_foundon every approval frame forever.Routing these pushes through
push_to_browser(and folding in the same GC branch onfunction_not_found) would fix both. If you keep the raw-spawn form, at least add the GC arm — it's the same six lines as 432-442.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@harness/src/fanout.rs` around lines 384 - 400, The approval push loop using approval_pushes_for currently tokio::spawn(s) direct iii.trigger calls which bypass per-browser backpressure (PER_BROWSER_QUEUE_CAP) and the ui::session::resync deduper and also omits the is_function_not_found → evict_browser GC path; change the code to call the existing push_to_browser path (reuse push_to_browser(...) with the same TriggerRequest/payload) so pushes honor PER_BROWSER_QUEUE_CAP and arm resync, and ensure the function_not_found handling from the per-session forward (the evict_browser branch guarded by is_function_not_found) is executed for approval pushes too; if you decide to keep raw tokio::spawn, then at minimum implement the same queue accounting/resync arming and replicate the six-line is_function_not_found→evict_browser GC branch used in lines ~432-442 around the per-session forward.harness/src/lib.rs (1)
278-278: ⚡ Quick winSimplify to use
iii.clone()directly, matching the pattern elsewhere in the file.Wrapping
iii.clone()inArc::newcreates unnecessary double indirection:Arc<Arc<T>>. Lines 145, 194, and 379 all captureiii.clone()directly into closures without Arc wrapping and work fine. Since the comment at lines 371–372 confirmsIIIis already internally Arc-wrapped, the outerArc::new()is redundant. Uselet iii_for_subscribe = iii.clone();instead for consistency.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@harness/src/lib.rs` at line 278, The variable iii_for_subscribe is wrapped with Arc::new(iii.clone()) causing an Arc<Arc<T>> double indirection; replace that line with let iii_for_subscribe = iii.clone(); so the closure captures the existing Arc-wrapped III directly (match the pattern used for iii.clone() elsewhere and avoid the redundant outer Arc::new()).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@harness/src/fanout.rs`:
- Around line 509-512: The synthesized "approval_resolved" push in
harness/src/fanout.rs uses ApprovalUiPush::Resolved with a payload containing
only "function_call_id" and "tool_call_id" (call_id), so update the TypeScript
shape to match that minimal payload: edit the approval_resolved type in
harness/web/src/types.ts (the type referenced by resolvedCallId()) to either
remove the declared decision and reason fields or mark them optional, leaving
only function_call_id and tool_call_id present; alternatively, if you prefer to
keep the existing TS shape, change the Rust fanout to include decision and
reason in the JSON payload so both sides agree.
In `@harness/src/lib.rs`:
- Around line 312-319: The tokio::spawn is fire-and-forget and drops any errors
from fanout::hydrate_all_sessions_subscriber, so wrap the call to
fanout::hydrate_all_sessions_subscriber(iii_for_hydrate, fanout_for_hydrate,
browser_for_hydrate).await inside an error-checking block in the spawned task
and log failures (e.g., using error! or processLogger) with the error and
contextual info; locate the existing tokio::spawn invocation and replace it with
an async move that calls the hydrate_all_sessions_subscriber, matches on the
Result, and logs any Err along with identifying context
(iii_for_hydrate/fanout_for_hydrate/browser_for_hydrate) so hydration failures
are observable.
---
Nitpick comments:
In `@harness/src/fanout.rs`:
- Around line 522-540: hydration_payloads currently unconditionally overwrites
any existing "type" field on each pending entry before calling
classify_approval_frame; change the logic inside hydration_payloads so it only
inserts "type": "approval_requested" when the entry does not already have a
"type" key (i.e., check obj.contains_key("type") first), preserve the rest of
the flow/classification with classify_approval_frame, and add a brief doc
comment above hydration_payloads explaining that we only stamp a default type
when missing to avoid clobbering upstream-provided types such as
"approval_resolved".
- Around line 8-19: The module doc comment is out of date: update the header
paragraph to list all upstream pumps and what they do — mention the
agent::events stream subscriber now also classifies approval frames and pushes
ui::approval::{requested,resolved}::<browser_id> in addition to
ui::session::event::<browser_id>, and extend the second item to enumerate the
sessions-changed poll plus the cost/workers polls and the iii-directory
on-change pumps that are wired in this file; keep the description concise and
accurate so readers see the full set of pumps and their high-level
responsibilities (agent::events handler, sessions-changed poll, cost/workers
polls, iii-directory on-change).
- Around line 567-583: The loop that calls iii.trigger(TriggerRequest {
function_id: "approval::list_pending", ... }) for each sid runs sequentially and
causes N×RTT latency; change it to collect each trigger future (referencing
sessions, iii.trigger, TriggerRequest, STATE_LIST_TIMEOUT_MS) into a Vec of
futures and use futures::future::join_all (or try_join_all) to await them
concurrently, then iterate the joined results to extract "pending" arrays and
push non-empty (sid.clone(), entries) into per_session preserving the same
extraction logic (ok().and_then(...).unwrap_or_default()) so behavior is
unchanged but all RPCs happen in parallel.
- Around line 384-400: The approval push loop using approval_pushes_for
currently tokio::spawn(s) direct iii.trigger calls which bypass per-browser
backpressure (PER_BROWSER_QUEUE_CAP) and the ui::session::resync deduper and
also omits the is_function_not_found → evict_browser GC path; change the code to
call the existing push_to_browser path (reuse push_to_browser(...) with the same
TriggerRequest/payload) so pushes honor PER_BROWSER_QUEUE_CAP and arm resync,
and ensure the function_not_found handling from the per-session forward (the
evict_browser branch guarded by is_function_not_found) is executed for approval
pushes too; if you decide to keep raw tokio::spawn, then at minimum implement
the same queue accounting/resync arming and replicate the six-line
is_function_not_found→evict_browser GC branch used in lines ~432-442 around the
per-session forward.
In `@harness/src/lib.rs`:
- Line 278: The variable iii_for_subscribe is wrapped with Arc::new(iii.clone())
causing an Arc<Arc<T>> double indirection; replace that line with let
iii_for_subscribe = iii.clone(); so the closure captures the existing
Arc-wrapped III directly (match the pattern used for iii.clone() elsewhere and
avoid the redundant outer Arc::new()).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ca9ca733-d3dc-4844-8692-f1792d452c32
📒 Files selected for processing (2)
harness/src/fanout.rsharness/src/lib.rs
| "approval_resolved" => Some(ApprovalUiPush::Resolved(json!({ | ||
| "function_call_id": call_id, | ||
| "tool_call_id": call_id, | ||
| }))), |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Inspect resolved-frame consumers to confirm they only need the call id.
rg -nP -C5 '\bapproval::resolved\b|approval_resolved|ApprovalResolved' --type=ts --type=rust
# And the old diff_approvals/poll-resolved payload shape, if still in history.
rg -nP -C5 '\bdiff_approvals\b|approval::list_pending'Repository: iii-hq/workers
Length of output: 40684
🏁 Script executed:
rg -nP -C3 'decision.*allow.*deny|decision.*"allow"|decision.*"deny"' harness/web/src --type=ts -A2Repository: iii-hq/workers
Length of output: 1255
🏁 Script executed:
rg -nP 'resolvedCallId|payload\.(decision|reason)' harness/web/src --type=ts -B2 -A2Repository: iii-hq/workers
Length of output: 727
The synthesized payload correctly sends only the call id, but the TypeScript type definition mismatch could mislead future developers.
The payload at lines 509-512 intentionally drops decision and reason fields from the upstream approval_resolved event. The web consumer in harness/web/src/useStatus.ts confirms this is safe — it only extracts the call id via resolvedCallId() to filter pending approvals and never accesses the decision or reason fields from the payload.
However, the TypeScript type definition in harness/web/src/types.ts (lines 241-242) declares decision and reason as part of the approval_resolved payload, creating a contract mismatch. If future changes attempt to render or audit the outcome based on these type hints, they will find the fields undefined. Consider either sending the full fields or updating the type definition to match the actual minimal payload.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/src/fanout.rs` around lines 509 - 512, The synthesized
"approval_resolved" push in harness/src/fanout.rs uses ApprovalUiPush::Resolved
with a payload containing only "function_call_id" and "tool_call_id" (call_id),
so update the TypeScript shape to match that minimal payload: edit the
approval_resolved type in harness/web/src/types.ts (the type referenced by
resolvedCallId()) to either remove the declared decision and reason fields or
mark them optional, leaving only function_call_id and tool_call_id present;
alternatively, if you prefer to keep the existing TS shape, change the Rust
fanout to include decision and reason in the JSON payload so both sides agree.
| tokio::spawn(async move { | ||
| fanout::hydrate_all_sessions_subscriber( | ||
| iii_for_hydrate, | ||
| fanout_for_hydrate, | ||
| browser_for_hydrate, | ||
| ) | ||
| .await; | ||
| }); |
There was a problem hiding this comment.
Add error logging for hydration failures.
The fire-and-forget tokio::spawn silently discards any errors from hydrate_all_sessions_subscriber. If hydration fails (e.g., due to state::list or approval::list_pending errors), late-joining all-sessions subscribers won't see pending approvals, and the failure won't be observable.
Consider wrapping the spawned task with error logging to improve observability.
📋 Proposed fix to add error logging
if is_all_sessions {
let fanout_for_hydrate = Arc::clone(&fanout);
let browser_for_hydrate = browser_id.clone();
tokio::spawn(async move {
- fanout::hydrate_all_sessions_subscriber(
+ if let Err(e) = fanout::hydrate_all_sessions_subscriber(
iii_for_hydrate,
fanout_for_hydrate,
browser_for_hydrate,
)
- .await;
+ .await {
+ eprintln!("hydrate_all_sessions_subscriber failed: {e:?}");
+ }
});
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| tokio::spawn(async move { | |
| fanout::hydrate_all_sessions_subscriber( | |
| iii_for_hydrate, | |
| fanout_for_hydrate, | |
| browser_for_hydrate, | |
| ) | |
| .await; | |
| }); | |
| tokio::spawn(async move { | |
| if let Err(e) = fanout::hydrate_all_sessions_subscriber( | |
| iii_for_hydrate, | |
| fanout_for_hydrate, | |
| browser_for_hydrate, | |
| ) | |
| .await { | |
| eprintln!("hydrate_all_sessions_subscriber failed: {e:?}"); | |
| } | |
| }); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/src/lib.rs` around lines 312 - 319, The tokio::spawn is
fire-and-forget and drops any errors from
fanout::hydrate_all_sessions_subscriber, so wrap the call to
fanout::hydrate_all_sessions_subscriber(iii_for_hydrate, fanout_for_hydrate,
browser_for_hydrate).await inside an error-checking block in the spawned task
and log failures (e.g., using error! or processLogger) with the error and
contextual info; locate the existing tokio::spawn invocation and replace it with
an async move that calls the hydrate_all_sessions_subscriber, matches on the
Result, and logs any Err along with identifying context
(iii_for_hydrate/fanout_for_hydrate/browser_for_hydrate) so hydration failures
are observable.
Summary
The approval-gate already emits
approval_requested/approval_resolvedframes into theagent::eventsstream, and the harness fanout already runs adurable:subscriberagainst that stream. The 1-second approval poll inspawn_approval_pollwas a second, duplicate path for the same data — diffingapproval::list_pendingsnapshots and emittingui::approval::*events with up to a second of latency.This change forwards approval frames directly from the existing stream subscriber and hydrates new browsers on subscribe instead of on a timer.
What changed
Live updates — the
agent::eventsstream handler now classifies each frame and forwards approval frames to all-sessions subscribers asui::approval::requested::<browser_id>/ui::approval::resolved::<browser_id>. Latency drops from ≤1s (poll cadence) to one stream RTT.Reconnect hydration —
ui::subscribefor all-sessions browsers now triggers a one-shot replay: enumerate sessions viastate::list, callapproval::list_pendingper session, push oneui::approval::requestedper pending entry to the new browser only. Fire-and-forget; the subscribe response doesn't block on it.Deleted —
spawn_approval_poll,APPROVAL_POLL_INTERVAL_MS, thediff_approvalshelper, andFanoutPumps.approval_poll.approval::list_pendingstays as the hydration RPC.Contract
The wire format (
ui::approval::requested::<browser_id>/ui::approval::resolved::<browser_id>payloads) is unchanged. The web reducer (harness/web/src/useStatus.ts) and the TUI (harness-tui/src/bus.rs) need no changes. Legacy field shapes (tool_call_id,tool_name) keep working — covered by tests.Tests
48 → 62 tests in the harness suite. New coverage:
classify_approval_frame: happy path forapproval_requested/approval_resolved; ignores non-approval types; drops missingfunction_call_id; drops emptysession_id; accepts legacytool_call_id/tool_name.hydration_payloads: emits one push per pending entry; skips malformed entries; empty input is a no-op; filters non-pendingstatus (guards against timed-out approvals reappearing on reconnect).hydration_pushes_for: orchestration across multiple sessions; empty input; sessions with no pending skipped.approval_pushes_for: channel naming convention pinned per browser; zero-browser case.Removed: two
diff_approvalstests (dead with the poll).Test plan
cargo test -p harness— 62 passed (5 suites)cargo clippy --lib --tests -- -D warnings— cleanshell::fs::write; approval row appears within stream RTT (not 1s); reload mid-pending → row reappears; multi-tab → both tabs clear on resolvedurable:subscriberdoes not replay historical stream frames on attach. If it does, hydration + replay would double-push; the web reducer's id-dedup mitigates but doesn't fully prevent it.Summary by CodeRabbit