From d84a2350d20b20c88be0aedecb5eb5da13889159 Mon Sep 17 00:00:00 2001 From: Ytallo Layon Date: Thu, 14 May 2026 19:32:29 -0300 Subject: [PATCH] refactor(harness): drive ui::approval events from agent::events stream The approval-gate already writes approval_requested / approval_resolved frames to the agent::events stream, and the fanout already subscribes to that stream. The 1s approval poll was duplicate machinery layered on top. Forward approval frames from the existing stream subscriber to all-sessions browsers as ui::approval::{requested,resolved}::. Hydrate pending approvals once per new all-sessions subscriber by calling approval::list_pending at subscribe time instead of on a timer. Removes: spawn_approval_poll, APPROVAL_POLL_INTERVAL_MS, diff_approvals, and FanoutPumps.approval_poll. approval::list_pending stays as the hydration RPC. --- harness/src/fanout.rs | 625 +++++++++++++++++++++++++++++++----------- harness/src/lib.rs | 27 +- 2 files changed, 492 insertions(+), 160 deletions(-) diff --git a/harness/src/fanout.rs b/harness/src/fanout.rs index b5d2e6ad..0cb19ca1 100644 --- a/harness/src/fanout.rs +++ b/harness/src/fanout.rs @@ -98,11 +98,6 @@ const STATE_LIST_TIMEOUT_MS: u64 = 5_000; /// the engine's default state worker; the wire round-trip dominates. const SESSIONS_POLL_INTERVAL_MS: u64 = 1_000; -/// Approval poll cadence. Hook-driven push (via the agent::events stream -/// pump) covers low-latency notification; this poll catches missed states -/// and clears resolved approvals. -const APPROVAL_POLL_INTERVAL_MS: u64 = 1_000; - /// Cost summary poll cadence. Each tick performs a `budget::list` and (for /// changed budgets) a `budget::usage` round-trip. 2s is cheap and matches /// the design coalescing target. @@ -199,7 +194,6 @@ pub struct FanoutPumps { pub prompts_on_change_fn: FunctionRef, pub prompts_on_change_trigger: Option, pub sessions_poll: tokio::task::JoinHandle<()>, - pub approval_poll: tokio::task::JoinHandle<()>, pub cost_poll: tokio::task::JoinHandle<()>, pub workers_poll: tokio::task::JoinHandle<()>, } @@ -219,7 +213,6 @@ impl FanoutPumps { self.skills_on_change_fn.unregister(); self.prompts_on_change_fn.unregister(); self.sessions_poll.abort(); - self.approval_poll.abort(); self.cost_poll.abort(); self.workers_poll.abort(); } @@ -262,7 +255,6 @@ pub fn spawn_subscribers(iii: &Arc, fanout: SharedFanout) -> FanoutPumps { ); let sessions_poll = spawn_sessions_changed_poll(Arc::clone(iii), Arc::clone(&fanout)); - let approval_poll = spawn_approval_poll(Arc::clone(iii), Arc::clone(&fanout)); let cost_poll = spawn_cost_poll(Arc::clone(iii), Arc::clone(&fanout)); let workers_poll = spawn_workers_poll(Arc::clone(iii), fanout); @@ -274,7 +266,6 @@ pub fn spawn_subscribers(iii: &Arc, fanout: SharedFanout) -> FanoutPumps { prompts_on_change_fn, prompts_on_change_trigger, sessions_poll, - approval_poll, cost_poll, workers_poll, } @@ -379,6 +370,35 @@ fn register_agent_event_pump(iii: &III, fanout: SharedFanout) -> FunctionRef { let fanout = Arc::clone(&fanout); async move { if let Some((session_id, event_data)) = extract_event_payload(&payload) { + // Reactive ui::approval::* path: gate writes + // approval_{requested,resolved} frames into agent::events; + // we forward them to all-sessions subscribers without + // polling state. Non-approval frames classify as None and + // fall through to the regular ui::session::event forward + // below. + if let Some(push) = classify_approval_frame(&event_data, &session_id) { + let all_sessions = { + let state = fanout.read().await; + state.all_sessions_subscribers() + }; + for (channel, push_payload) in approval_pushes_for(&push, &all_sessions) { + let iii_for_push = iii.clone(); + tokio::spawn(async move { + if let Err(e) = iii_for_push + .trigger(TriggerRequest { + function_id: channel, + payload: push_payload, + action: None, + timeout_ms: Some(PUSH_TIMEOUT_MS), + }) + .await + { + tracing::trace!(error = %e, "ui::approval push failed"); + } + }); + } + } + let browsers = { let state = fanout.read().await; state.subscribers_for(&session_id) @@ -451,6 +471,166 @@ fn extract_event_payload(payload: &Value) -> Option<(String, Value)> { Some((session_id, data)) } +/// Push intent derived from an `agent::events` stream frame. Drives the +/// reactive ui::approval pipeline (replaces the approval poll). +#[derive(Debug, PartialEq, Eq)] +pub enum ApprovalUiPush { + /// Forward as `ui::approval::requested::`. Payload mirrors the + /// poll's enriched record: original gate fields plus `session_id`. + Requested(Value), + /// Forward as `ui::approval::resolved::`. Payload carries the + /// call id under both new and legacy field names so existing consumers in + /// `harness/web/src/useStatus.ts` and `harness-tui/src/types.rs` keep + /// working. + Resolved(Value), +} + +/// Classify an `agent::events` frame body as a UI-bound approval push. +/// +/// Returns `None` for non-approval frames and for malformed approval frames +/// (missing ids). Pure function — wired into the stream subscriber callback. +pub fn classify_approval_frame(data: &Value, session_id: &str) -> Option { + if session_id.is_empty() { + return None; + } + let frame_type = data.get("type").and_then(Value::as_str)?; + let call_id = data + .get("function_call_id") + .or_else(|| data.get("tool_call_id")) + .and_then(Value::as_str)?; + match frame_type { + "approval_requested" => { + let mut payload = data.clone(); + if let Some(obj) = payload.as_object_mut() { + obj.insert("session_id".into(), Value::String(session_id.to_string())); + } + Some(ApprovalUiPush::Requested(payload)) + } + "approval_resolved" => Some(ApprovalUiPush::Resolved(json!({ + "function_call_id": call_id, + "tool_call_id": call_id, + }))), + _ => None, + } +} + +/// Build per-session hydration payloads for a new all-sessions subscriber. +/// +/// Each entry in `pending` (as returned by `approval::list_pending`) becomes +/// one ui::approval::requested-ready payload. Filters: only `status=pending` +/// entries; entries without a call id are skipped via `classify_approval_frame`. +pub fn hydration_payloads(session_id: &str, pending: &[Value]) -> Vec { + pending + .iter() + .filter(|entry| entry.get("status").and_then(Value::as_str) == Some("pending")) + .filter_map(|entry| { + let mut synth = entry.clone(); + if let Some(obj) = synth.as_object_mut() { + obj.insert( + "type".into(), + Value::String("approval_requested".into()), + ); + } + match classify_approval_frame(&synth, session_id)? { + ApprovalUiPush::Requested(payload) => Some(payload), + ApprovalUiPush::Resolved(_) => None, + } + }) + .collect() +} + +/// Hydrate a freshly-attached all-sessions subscriber. +/// +/// Enumerates active sessions via `state::list`, fetches pending approvals via +/// `approval::list_pending`, and pushes `ui::approval::requested::` +/// for each. Replaces the periodic poll for the reconnect/late-join case. +/// Fire-and-forget: spawn this; do not await it from request handlers. +pub async fn hydrate_all_sessions_subscriber( + iii: Arc, + fanout: SharedFanout, + browser_id: String, +) { + let sessions = match iii + .trigger(TriggerRequest { + function_id: "state::list".into(), + payload: json!({ "scope": "agent", "prefix": "session/" }), + action: None, + timeout_ms: Some(STATE_LIST_TIMEOUT_MS), + }) + .await + { + Ok(v) => extract_session_ids(&v), + Err(_) => return, + }; + + let mut per_session: Vec<(String, Vec)> = Vec::with_capacity(sessions.len()); + for sid in &sessions { + let resp = iii + .trigger(TriggerRequest { + function_id: "approval::list_pending".into(), + payload: json!({ "session_id": sid }), + action: None, + timeout_ms: Some(STATE_LIST_TIMEOUT_MS), + }) + .await; + let entries = resp + .ok() + .and_then(|v| v.get("pending").and_then(|p| p.as_array()).cloned()) + .unwrap_or_default(); + if !entries.is_empty() { + per_session.push((sid.clone(), entries)); + } + } + + for (channel, payload) in hydration_pushes_for(&browser_id, &per_session) { + push_to_browser( + &iii, + &fanout, + &browser_id, + channel, + payload, + PushKind::Standard, + ); + } +} + +/// Orchestration helper for subscribe-time hydration. +/// +/// Given a freshly-attached all-sessions subscriber and the per-session +/// `pending` lists already fetched from `approval::list_pending`, produce the +/// `(channel, payload)` pairs the caller should drive through `iii.trigger`. +/// Pure function — the async glue (state::list, list_pending, trigger) wraps +/// this. +pub fn hydration_pushes_for( + browser_id: &str, + per_session: &[(String, Vec)], +) -> Vec<(String, Value)> { + let channel = format!("ui::approval::requested::{browser_id}"); + per_session + .iter() + .flat_map(|(session_id, pending)| { + hydration_payloads(session_id, pending) + .into_iter() + .map(|payload| (channel.clone(), payload)) + }) + .collect() +} + +/// Fan a classified approval push out to all-sessions subscribers. +/// +/// Returns `(channel, payload)` pairs the pump can hand to `iii.trigger`. +/// Keeps wire channel naming and per-browser cloning isolated and testable. +pub fn approval_pushes_for(push: &ApprovalUiPush, browser_ids: &[String]) -> Vec<(String, Value)> { + let (root, payload) = match push { + ApprovalUiPush::Requested(p) => ("ui::approval::requested", p), + ApprovalUiPush::Resolved(p) => ("ui::approval::resolved", p), + }; + browser_ids + .iter() + .map(|b| (format!("{root}::{b}"), payload.clone())) + .collect() +} + fn spawn_sessions_changed_poll(iii: Arc, fanout: SharedFanout) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut prev: HashSet = HashSet::new(); @@ -664,122 +844,6 @@ pub(crate) fn diff_workers( })) } -/// Pure diff helper for the approval pump. Returns the IDs that newly -/// appeared in `next` and the ones that were removed since `prev`. -pub(crate) fn diff_approvals( - prev: &HashMap, - next: &HashMap, -) -> (Vec<(String, Value)>, Vec) { - let mut requested: Vec<(String, Value)> = next - .iter() - .filter(|(k, _)| !prev.contains_key(*k)) - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); - requested.sort_by(|a, b| a.0.cmp(&b.0)); - let mut resolved: Vec = prev - .keys() - .filter(|k| !next.contains_key(*k)) - .cloned() - .collect(); - resolved.sort(); - (requested, resolved) -} - -/// Spawn the approval pump. Polls `approval::list_pending` for every known -/// session every `APPROVAL_POLL_INTERVAL_MS`. On change, pushes -/// `ui::approval::requested` (per new entry) and `ui::approval::resolved` -/// (per removed entry) to all-sessions subscribers. -fn spawn_approval_poll(iii: Arc, fanout: SharedFanout) -> tokio::task::JoinHandle<()> { - tokio::spawn(async move { - let mut prev: HashMap = HashMap::new(); - let mut interval = tokio::time::interval(Duration::from_millis(APPROVAL_POLL_INTERVAL_MS)); - interval.tick().await; - loop { - interval.tick().await; - - // Discover known sessions; without them we have nowhere to ask. - let session_ids = match iii - .trigger(TriggerRequest { - function_id: "state::list".into(), - payload: json!({ "scope": "agent", "prefix": "session/" }), - action: None, - timeout_ms: Some(STATE_LIST_TIMEOUT_MS), - }) - .await - { - Ok(v) => extract_session_ids(&v), - Err(_) => continue, - }; - - let mut next: HashMap = HashMap::new(); - for sid in &session_ids { - let resp = iii - .trigger(TriggerRequest { - function_id: "approval::list_pending".into(), - payload: json!({ "session_id": sid }), - action: None, - timeout_ms: Some(STATE_LIST_TIMEOUT_MS), - }) - .await; - let Ok(resp) = resp else { continue }; - let Some(arr) = resp.get("pending").and_then(|v| v.as_array()) else { - continue; - }; - for entry in arr { - let Some(id) = entry - .get("function_call_id") - .or_else(|| entry.get("tool_call_id")) - .and_then(|v| v.as_str()) - else { - continue; - }; - // Annotate with session_id so the UI can group/filter. - let mut enriched = entry.clone(); - if let Some(obj) = enriched.as_object_mut() { - obj.insert("session_id".into(), Value::String(sid.clone())); - } - next.insert(id.to_string(), enriched); - } - } - - let (requested, resolved) = diff_approvals(&prev, &next); - if requested.is_empty() && resolved.is_empty() { - continue; - } - - let browsers = { - let state = fanout.read().await; - state.all_sessions_subscribers() - }; - - for browser_id in &browsers { - for (_id, payload) in &requested { - push_to_browser( - &iii, - &fanout, - browser_id, - format!("ui::approval::requested::{browser_id}"), - payload.clone(), - PushKind::Standard, - ); - } - for id in &resolved { - push_to_browser( - &iii, - &fanout, - browser_id, - format!("ui::approval::resolved::{browser_id}"), - json!({ "function_call_id": id, "tool_call_id": id }), - PushKind::Standard, - ); - } - } - - prev = next; - } - }) -} - /// Spawn the cost poll. Calls `budget::list` every `COST_POLL_INTERVAL_MS`, /// computes a {usd_today, by_provider} summary, and pushes /// `ui::cost::tick` to all-sessions subscribers when totals change. @@ -1185,40 +1249,6 @@ mod tests { assert_eq!(workers[2]["status"], json!("up")); } - #[test] - fn fanout_approval_pump_emits_resolved_on_removal() { - let mut prev: HashMap = HashMap::new(); - prev.insert( - "tc-1".into(), - json!({ "function_call_id": "tc-1", "tool_call_id": "tc-1", "function_id": "write", "tool_name": "write" }), - ); - let next: HashMap = HashMap::new(); - let (requested, resolved) = diff_approvals(&prev, &next); - assert!(requested.is_empty()); - assert_eq!(resolved, vec!["tc-1".to_string()]); - } - - #[test] - fn fanout_approval_pump_emits_requested_then_resolved_in_sequence() { - // Step 1: empty -> {tc-1} - let initial: HashMap = HashMap::new(); - let mut after_request: HashMap = HashMap::new(); - after_request.insert( - "tc-1".into(), - json!({ "function_call_id": "tc-1", "tool_call_id": "tc-1", "function_id": "rm", "tool_name": "rm" }), - ); - let (added, removed) = diff_approvals(&initial, &after_request); - assert_eq!(added.len(), 1); - assert_eq!(added[0].0, "tc-1"); - assert!(removed.is_empty()); - - // Step 2: {tc-1} -> {} after user resolves. - let after_resolve: HashMap = HashMap::new(); - let (added2, removed2) = diff_approvals(&after_request, &after_resolve); - assert!(added2.is_empty()); - assert_eq!(removed2, vec!["tc-1".to_string()]); - } - #[test] fn extract_worker_status_reads_array_form() { let v = json!([ @@ -1340,4 +1370,281 @@ mod tests { assert!(!first, "first overflow should emit resync"); assert!(second, "second overflow should be deduped"); } + + // ─── Reactive approval pipeline ────────────────────────────────────── + // + // classify_approval_frame turns a parsed `agent::events` frame body into + // an explicit push intent. The stream subscriber callback uses it to + // forward approval_requested/resolved frames as ui::approval::* events + // without polling state. + + #[test] + fn classify_approval_frame_requested_enriches_with_session_id() { + let data = json!({ + "type": "approval_requested", + "function_call_id": "c1", + "tool_call_id": "c1", + "function_id": "shell::fs::write", + "tool_name": "shell::fs::write", + "args": { "path": "/tmp/x" }, + "expires_at": 1_234_567_890_u64, + }); + let out = classify_approval_frame(&data, "s1").expect("requested classified"); + match out { + ApprovalUiPush::Requested(payload) => { + assert_eq!(payload["session_id"], json!("s1")); + assert_eq!(payload["function_call_id"], json!("c1")); + assert_eq!(payload["function_id"], json!("shell::fs::write")); + } + ApprovalUiPush::Resolved(_) => panic!("expected Requested, got Resolved"), + } + } + + #[test] + fn classify_approval_frame_resolved_emits_minimal_payload() { + let data = json!({ + "type": "approval_resolved", + "function_call_id": "c1", + "decision": "allow", + }); + let out = classify_approval_frame(&data, "s1").expect("resolved classified"); + match out { + ApprovalUiPush::Resolved(payload) => { + assert_eq!(payload["function_call_id"], json!("c1")); + assert_eq!(payload["tool_call_id"], json!("c1")); + } + ApprovalUiPush::Requested(_) => panic!("expected Resolved, got Requested"), + } + } + + // B1 — non-approval frame types must classify as None so the regular + // session-event forward path keeps owning them. + #[test] + fn classify_approval_frame_ignores_non_approval_types() { + let data = json!({ + "type": "tool_call_started", + "function_call_id": "c1", + "function_id": "shell::fs::read", + }); + assert!(classify_approval_frame(&data, "s1").is_none()); + } + + // B2 — malformed approval frames without an id must be dropped silently, + // never panic, and never produce a push (UI would have nothing to key on). + #[test] + fn classify_approval_frame_drops_when_call_id_missing() { + let requested = json!({ + "type": "approval_requested", + "function_id": "shell::fs::write", + }); + let resolved = json!({ "type": "approval_resolved", "decision": "allow" }); + assert!(classify_approval_frame(&requested, "s1").is_none()); + assert!(classify_approval_frame(&resolved, "s1").is_none()); + } + + // B3 — legacy field names (tool_call_id / tool_name) must still produce a + // push so reload hydration via approval::list_pending stays compatible + // with older gate envelopes captured in state. + #[test] + fn classify_approval_frame_accepts_legacy_tool_call_id() { + let data = json!({ + "type": "approval_requested", + "tool_call_id": "c1", + "tool_name": "shell::fs::write", + "args": {}, + }); + let out = classify_approval_frame(&data, "s1").expect("legacy shape classified"); + match out { + ApprovalUiPush::Requested(payload) => { + assert_eq!(payload["tool_call_id"], json!("c1")); + assert_eq!(payload["session_id"], json!("s1")); + } + ApprovalUiPush::Resolved(_) => panic!("expected Requested"), + } + } + + // B10 — empty session_id must not produce a push whose UI cannot key on a + // session. Today, the upstream extract_event_payload already drops empty + // group ids; this guard pins the contract at the classifier too so future + // refactors of either layer don't open a regression. + #[test] + fn classify_approval_frame_drops_when_session_id_empty() { + let data = json!({ + "type": "approval_requested", + "function_call_id": "c1", + }); + assert!(classify_approval_frame(&data, "").is_none()); + } + + // ─── Hydration payloads ────────────────────────────────────────────── + // + // When a new all-sessions subscriber attaches, we replay + // approval::list_pending per session into the new browser. The pure + // helper below turns one session's `pending` array into a list of + // ui::approval::requested-ready payloads. Tests pin the filters that + // make this safe to call on reconnects (B4, B9, B11). + + // A3 — happy path: each pending entry becomes one enriched payload. + #[test] + fn hydration_payloads_emits_one_per_pending_entry() { + let pending = vec![ + json!({ + "function_call_id": "c1", + "function_id": "shell::fs::write", + "args": { "path": "/a" }, + "status": "pending", + "expires_at": 1u64, + }), + json!({ + "function_call_id": "c2", + "function_id": "shell::fs::mkdir", + "args": { "path": "/b" }, + "status": "pending", + "expires_at": 2u64, + }), + ]; + let out = hydration_payloads("s1", &pending); + assert_eq!(out.len(), 2); + assert_eq!(out[0]["function_call_id"], json!("c1")); + assert_eq!(out[0]["session_id"], json!("s1")); + assert_eq!(out[1]["function_call_id"], json!("c2")); + assert_eq!(out[1]["session_id"], json!("s1")); + } + + // B4 — malformed entry (no call id) is dropped, other entries still flow. + #[test] + fn hydration_payloads_skips_entries_missing_call_id() { + let pending = vec![ + json!({ "function_id": "shell::fs::write", "status": "pending" }), // bad + json!({ + "function_call_id": "c2", + "function_id": "shell::fs::mkdir", + "status": "pending", + }), + ]; + let out = hydration_payloads("s1", &pending); + assert_eq!(out.len(), 1); + assert_eq!(out[0]["function_call_id"], json!("c2")); + } + + // B9 — empty input is a no-op, not an error. + #[test] + fn hydration_payloads_empty_input_returns_empty() { + assert!(hydration_payloads("s1", &[]).is_empty()); + } + + // approval_pushes_for fans a classified push out to N all-sessions + // browsers, producing (channel, payload) pairs. The pump's only job + // after classifying is to drive `iii.trigger` per pair, so pinning the + // channel naming convention here is enough to lock the wire format. + #[test] + fn approval_pushes_for_requested_targets_ui_approval_requested_per_browser() { + let push = ApprovalUiPush::Requested(json!({ "function_call_id": "c1", "session_id": "s1" })); + let out = approval_pushes_for(&push, &["b1".into(), "b2".into()]); + assert_eq!(out.len(), 2); + assert_eq!(out[0].0, "ui::approval::requested::b1"); + assert_eq!(out[0].1["function_call_id"], json!("c1")); + assert_eq!(out[1].0, "ui::approval::requested::b2"); + } + + #[test] + fn approval_pushes_for_resolved_targets_ui_approval_resolved_per_browser() { + let push = ApprovalUiPush::Resolved(json!({ "function_call_id": "c1", "tool_call_id": "c1" })); + let out = approval_pushes_for(&push, &["b1".into()]); + assert_eq!(out.len(), 1); + assert_eq!(out[0].0, "ui::approval::resolved::b1"); + } + + #[test] + fn approval_pushes_for_zero_browsers_is_noop() { + let push = ApprovalUiPush::Resolved(json!({ "function_call_id": "c1" })); + assert!(approval_pushes_for(&push, &[]).is_empty()); + } + + // hydration_pushes_for is the orchestration helper used when an + // all-sessions subscriber attaches. Given the per-session pending lists + // already fetched from approval::list_pending, it produces the exact + // (channel, payload) pairs the subscribe handler should push. + #[test] + fn hydration_pushes_for_emits_one_push_per_pending_entry_across_sessions() { + let per_session = vec![ + ( + "s1".to_string(), + vec![json!({ + "function_call_id": "c1", + "function_id": "shell::fs::write", + "status": "pending", + })], + ), + ( + "s2".to_string(), + vec![json!({ + "function_call_id": "c2", + "function_id": "shell::fs::mkdir", + "status": "pending", + })], + ), + ]; + let out = hydration_pushes_for("browser-a", &per_session); + assert_eq!(out.len(), 2); + assert!(out + .iter() + .all(|(chan, _)| chan == "ui::approval::requested::browser-a")); + let ids: Vec<&str> = out + .iter() + .map(|(_, p)| p["function_call_id"].as_str().unwrap()) + .collect(); + assert!(ids.contains(&"c1") && ids.contains(&"c2")); + } + + #[test] + fn hydration_pushes_for_empty_per_session_returns_empty() { + assert!(hydration_pushes_for("browser-a", &[]).is_empty()); + } + + #[test] + fn hydration_pushes_for_session_with_no_pending_is_skipped() { + let per_session = vec![ + ("s1".to_string(), vec![]), + ( + "s2".to_string(), + vec![json!({ + "function_call_id": "c2", + "status": "pending", + })], + ), + ]; + let out = hydration_pushes_for("b1", &per_session); + assert_eq!(out.len(), 1); + assert_eq!(out[0].1["function_call_id"], json!("c2")); + assert_eq!(out[0].1["session_id"], json!("s2")); + } + + // B11 — defense in depth: even if list_pending regresses and returns + // resolved entries, the hydration filter must drop them. This is the + // only guard against timed-out approvals reappearing on reconnect. + #[test] + fn hydration_payloads_filters_non_pending_status() { + let pending = vec![ + json!({ + "function_call_id": "c1", + "function_id": "shell::fs::write", + "status": "deny", + "reason": "timeout", + }), + json!({ + "function_call_id": "c2", + "function_id": "shell::fs::write", + "status": "allow", + }), + json!({ + "function_call_id": "c3", + "function_id": "shell::fs::write", + "status": "pending", + }), + ]; + let out = hydration_payloads("s1", &pending); + assert_eq!(out.len(), 1); + assert_eq!(out[0]["function_call_id"], json!("c3")); + } } diff --git a/harness/src/lib.rs b/harness/src/lib.rs index 4f457a5a..33633d1e 100644 --- a/harness/src/lib.rs +++ b/harness/src/lib.rs @@ -275,6 +275,7 @@ pub async fn register_with_iii_with_engine_url( let fanout = fanout::new_shared(); let fanout_for_subscribe = Arc::clone(&fanout); + let iii_for_subscribe = Arc::new(iii.clone()); let subscribe_fn = iii.register_function(( RegisterFunctionMessage::with_id("ui::subscribe".into()).with_description( "Register a browser's interest in a session (or all sessions if session_id is null)." @@ -282,6 +283,7 @@ pub async fn register_with_iii_with_engine_url( ), move |input: Value| { let fanout = Arc::clone(&fanout_for_subscribe); + let iii_for_hydrate = Arc::clone(&iii_for_subscribe); async move { let body = input.get("body").cloned().unwrap_or(input); let browser_id = body @@ -293,11 +295,29 @@ pub async fn register_with_iii_with_engine_url( .get("session_id") .and_then(Value::as_str) .map(str::to_string); + let is_all_sessions = session_id.is_none(); let total = { let mut state = fanout.write().await; - state.subscribe(browser_id, session_id); + state.subscribe(browser_id.clone(), session_id); state.browser_count() }; + // Fire-and-forget reactive hydration: replay any pending + // approvals into the new all-sessions subscriber. Replaces + // the old periodic approval poll. Per-session subscribers + // don't get pending-approval hydration (they only see live + // frames via the agent::events stream forward). + if is_all_sessions { + let fanout_for_hydrate = Arc::clone(&fanout); + let browser_for_hydrate = browser_id.clone(); + tokio::spawn(async move { + fanout::hydrate_all_sessions_subscriber( + iii_for_hydrate, + fanout_for_hydrate, + browser_for_hydrate, + ) + .await; + }); + } Ok::<_, IIIError>(json!({ "ok": true, "total_browsers": total, @@ -340,8 +360,13 @@ pub async fn register_with_iii_with_engine_url( // Wire the upstream fanout pumps: // - agent::events stream subscriber → ui::session::event:: + // → ui::approval::{requested,resolved}:: // - state::list poll → ui::sessions::changed:: // + // Pending-approval hydration runs on ui::subscribe for all-sessions + // browsers; live approval frames are forwarded by the agent::events + // subscriber (see classify_approval_frame). No periodic approval poll. + // // These spawn long-lived tasks; the returned handle ends them on // `unregister_all`. The `iii` clone here is fine — `III` is internally // ref-counted (it's already an `Arc<...>` inside the SDK).