diff --git a/app/src/lib/i18n/chunks/de-5.ts b/app/src/lib/i18n/chunks/de-5.ts index 79f041cc19..c8a26af5f3 100644 --- a/app/src/lib/i18n/chunks/de-5.ts +++ b/app/src/lib/i18n/chunks/de-5.ts @@ -523,28 +523,6 @@ const de5: TranslationMap = { 'settings.mascot.colorYellow': 'Gelb', 'settings.mascot.libraryUnavailable': 'OpenHuman Bibliothek nicht verfügbar', 'settings.mascot.title': 'OpenHuman', - 'settings.developerMenu.mcpServer.title': 'MCP-Server', - 'settings.developerMenu.mcpServer.desc': - 'Externe MCP-Clients zur Verbindung mit OpenHuman konfigurieren', - 'settings.mcpServer.title': 'MCP-Server', - 'settings.mcpServer.toolsSectionTitle': 'Verfügbare Tools', - 'settings.mcpServer.toolsSectionDesc': - 'Tools, die über den MCP-Stdio-Server bereitgestellt werden, wenn openhuman-core mcp ausgeführt wird', - 'settings.mcpServer.configSectionTitle': 'Client-Konfiguration', - 'settings.mcpServer.configSectionDesc': - 'Wählen Sie Ihren MCP-Client aus, um den passenden Konfigurations-Schnipsel zu erzeugen', - 'settings.mcpServer.copySnippet': 'In Zwischenablage kopieren', - 'settings.mcpServer.copied': 'Kopiert!', - 'settings.mcpServer.openConfigFile': 'Konfigurationsdatei öffnen', - 'settings.mcpServer.binaryPathNotFound': - 'OpenHuman-Binary nicht gefunden. Wenn Sie aus dem Quellcode arbeiten, bauen Sie mit: cargo build --bin openhuman-core', - 'settings.mcpServer.openConfigError': 'Konfigurationsdatei konnte nicht geöffnet werden', - 'settings.mcpServer.clientClaudeDesktop': 'Claude Desktop', - 'settings.mcpServer.clientCursor': 'Cursor', - 'settings.mcpServer.clientCodex': 'Codex', - 'settings.mcpServer.clientZed': 'Zed', - 'settings.mcpServer.configFilePath': 'Konfigurationsdatei', - 'settings.mcpServer.clientSelectorAriaLabel': 'MCP-Client-Auswahl', }; export default de5; diff --git a/src/openhuman/agent/harness/subagent_runner/ops.rs b/src/openhuman/agent/harness/subagent_runner/ops.rs index 9e54eb2fe5..46c9fc577d 100644 --- a/src/openhuman/agent/harness/subagent_runner/ops.rs +++ b/src/openhuman/agent/harness/subagent_runner/ops.rs @@ -1529,16 +1529,33 @@ async fn run_inner_loop( { let args = parse_tool_arguments(&call.arguments); let timeout = crate::openhuman::tool_timeout::tool_execution_timeout_duration(); - // ── External-effect approval gate (#1339) ───── + // ── External-effect approval gate (#1339, #2135) ─ // Subagents share the same gate as the parent loop; // see `tool_loop.rs` for the rationale. + // + // When the call is allowed and persisted, we keep + // hold of the `request_id` so we can stamp the + // terminal execution outcome onto the same audit + // row (issue #2135). + let mut approval_request_id: Option = None; + let mut approval_gate_for_audit: Option< + std::sync::Arc, + > = None; let gate_denial: Option = if tool.external_effect_with_args(&args) { if let Some(gate) = crate::openhuman::approval::ApprovalGate::try_global() { let summary = crate::openhuman::approval::summarize_action(&call.name, &args); let redacted = crate::openhuman::approval::redact_args(&args); - match gate.intercept(&call.name, &summary, redacted).await { - crate::openhuman::approval::GateOutcome::Allow => None, + let (outcome, request_id) = + gate.intercept_audited(&call.name, &summary, redacted).await; + match outcome { + crate::openhuman::approval::GateOutcome::Allow => { + approval_request_id = request_id; + if approval_request_id.is_some() { + approval_gate_for_audit = Some(gate); + } + None + } crate::openhuman::approval::GateOutcome::Deny { reason } => { tracing::warn!( tool = call.name.as_str(), @@ -1563,18 +1580,43 @@ async fn run_inner_loop( // (CodeRabbit review on PR #2149.) format!("Error: {reason}") } else { - match tokio::time::timeout(timeout, tool.execute(args)).await { - Ok(Ok(result)) => { - let raw = result.output(); - if result.is_error { - format!("Error: {raw}") - } else { - raw + let (raw, exec_success) = + match tokio::time::timeout(timeout, tool.execute(args)).await { + Ok(Ok(result)) => { + let raw = result.output(); + if result.is_error { + (format!("Error: {raw}"), false) + } else { + (raw, true) + } } - } - Ok(Err(err)) => format!("Error executing {}: {err}", call.name), - Err(_) => format!("Error: tool '{}' timed out", call.name), + Ok(Err(err)) => { + (format!("Error executing {}: {err}", call.name), false) + } + Err(_) => (format!("Error: tool '{}' timed out", call.name), false), + }; + // Stamp the terminal status onto the + // pending_approvals audit row — best-effort, + // failures don't propagate to the agent (#2135). + // Success comes from the structured execute result, + // not from parsing `raw.starts_with("Error")` — a + // legitimate success payload can start with "Error" + // (search hits, copied logs), which would otherwise + // persist a false Failure (CodeRabbit review on #2367). + if let (Some(gate), Some(req_id)) = ( + approval_gate_for_audit.as_ref(), + approval_request_id.as_ref(), + ) { + let success = exec_success; + let exec_outcome = if success { + crate::openhuman::approval::ExecutionOutcome::Success + } else { + crate::openhuman::approval::ExecutionOutcome::Failure + }; + let err_text = if success { None } else { Some(raw.as_str()) }; + gate.record_execution(req_id, exec_outcome, err_text); } + raw } } else { format!("Unknown tool: {}", call.name) diff --git a/src/openhuman/agent/harness/tool_loop.rs b/src/openhuman/agent/harness/tool_loop.rs index 13f7f68af0..c473fa1d26 100644 --- a/src/openhuman/agent/harness/tool_loop.rs +++ b/src/openhuman/agent/harness/tool_loop.rs @@ -680,12 +680,25 @@ pub(crate) async fn run_tool_call_loop( } } - // ── External-effect approval gate (#1339) ───────── + // ── External-effect approval gate (#1339, #2135) ── // Tools whose `external_effect()` returns true route // through the process-global `ApprovalGate` so the UI // can prompt the user before `execute()` runs. The gate // is `None` when supervised mode is disabled or in test // envs — behavior matches the pre-#1339 path. + // + // `approval_request_id` carries the persisted row id + // forward so we can stamp the terminal execution + // outcome onto the same `pending_approvals` row after + // the tool finishes (issue #2135). `None` means the + // tool was either not gated (no supervised gate, not + // external-effect), was session-allowlist-shortcutted, + // or was denied — none of which produce an audit row + // that needs an "after" entry. + let mut approval_request_id: Option = None; + let mut approval_gate_for_audit: Option< + std::sync::Arc, + > = None; if let Some(tool) = tool_opt { if tool.external_effect_with_args(&call.arguments) { if let Some(gate) = crate::openhuman::approval::ApprovalGate::try_global() { @@ -694,8 +707,15 @@ pub(crate) async fn run_tool_call_loop( &call.arguments, ); let redacted = crate::openhuman::approval::redact_args(&call.arguments); - match gate.intercept(&call.name, &summary, redacted).await { - crate::openhuman::approval::GateOutcome::Allow => {} + let (outcome, request_id) = + gate.intercept_audited(&call.name, &summary, redacted).await; + match outcome { + crate::openhuman::approval::GateOutcome::Allow => { + approval_request_id = request_id; + if approval_request_id.is_some() { + approval_gate_for_audit = Some(gate); + } + } crate::openhuman::approval::GateOutcome::Deny { reason } => { tracing::warn!( iteration, @@ -890,6 +910,29 @@ pub(crate) async fn run_tool_call_loop( log::warn!("[agent_loop] progress sink closed while emitting ToolCallCompleted: {e}"); } } + // ── Approval audit after-action row (#2135) ──── + // Stamp the terminal status onto the same + // `pending_approvals` row the gate created before + // execution, so the audit trail carries both the + // before (approval) and after (executed_at + + // outcome). Best-effort: a write failure here is + // logged but not propagated to the agent. + if let (Some(gate), Some(req_id)) = ( + approval_gate_for_audit.as_ref(), + approval_request_id.as_ref(), + ) { + let exec_outcome = if success { + crate::openhuman::approval::ExecutionOutcome::Success + } else { + crate::openhuman::approval::ExecutionOutcome::Failure + }; + let err_text = if success { + None + } else { + Some(result_text.as_str()) + }; + gate.record_execution(req_id, exec_outcome, err_text); + } result_text } else { tracing::warn!( diff --git a/src/openhuman/agent/triage/escalation.rs b/src/openhuman/agent/triage/escalation.rs index 0a79cf8756..2c38d61f47 100644 --- a/src/openhuman/agent/triage/escalation.rs +++ b/src/openhuman/agent/triage/escalation.rs @@ -94,6 +94,10 @@ pub async fn apply_decision(run: TriageRun, envelope: &TriggerEnvelope) -> anyho // still applies — defense in depth, not duplication // (each gate is short-circuited by the session // allowlist after the first approval). + let mut approval_request_id: Option = None; + let mut approval_gate_for_audit: Option< + std::sync::Arc, + > = None; if let Some(gate) = crate::openhuman::approval::ApprovalGate::try_global() { let summary = format!( "triage::{} target={} prompt_chars={}", @@ -109,8 +113,15 @@ pub async fn apply_decision(run: TriageRun, envelope: &TriggerEnvelope) -> anyho "prompt_chars": prompt.chars().count(), }); let tool_key = format!("triage.{}", run.decision.action.as_str()); - match gate.intercept(&tool_key, &summary, redacted).await { - crate::openhuman::approval::GateOutcome::Allow => {} + let (outcome, request_id) = + gate.intercept_audited(&tool_key, &summary, redacted).await; + match outcome { + crate::openhuman::approval::GateOutcome::Allow => { + approval_request_id = request_id; + if approval_request_id.is_some() { + approval_gate_for_audit = Some(gate); + } + } crate::openhuman::approval::GateOutcome::Deny { reason } => { tracing::warn!( action = %action_str, @@ -128,7 +139,24 @@ pub async fn apply_decision(run: TriageRun, envelope: &TriggerEnvelope) -> anyho } } - match dispatch_target_agent(target, prompt).await { + let dispatch_result = dispatch_target_agent(target, prompt).await; + // Record terminal status on the approval audit row + // (#2135). Best-effort: write errors are logged inside + // record_execution and never propagate to the caller. + if let (Some(gate), Some(req_id)) = ( + approval_gate_for_audit.as_ref(), + approval_request_id.as_ref(), + ) { + let (exec_outcome, err_text) = match &dispatch_result { + Ok(_) => (crate::openhuman::approval::ExecutionOutcome::Success, None), + Err(e) => ( + crate::openhuman::approval::ExecutionOutcome::Failure, + Some(e.to_string()), + ), + }; + gate.record_execution(req_id, exec_outcome, err_text.as_deref()); + } + match dispatch_result { Ok(output) => { tracing::info!( target_agent = %target, diff --git a/src/openhuman/approval/gate.rs b/src/openhuman/approval/gate.rs index 01702b311f..4006b109d2 100644 --- a/src/openhuman/approval/gate.rs +++ b/src/openhuman/approval/gate.rs @@ -35,7 +35,7 @@ use crate::core::event_bus::{publish_global, DomainEvent}; use crate::openhuman::config::Config; use super::store; -use super::types::{ApprovalDecision, GateOutcome, PendingApproval}; +use super::types::{ApprovalDecision, ExecutionOutcome, GateOutcome, PendingApproval}; /// How long the gate will park a future before timing out and /// returning `Deny`. 10 minutes matches the default `expires_at` @@ -92,12 +92,43 @@ impl ApprovalGate { /// Intercept a tool call. Blocks until the user decides or the /// TTL elapses (timeout → `Deny`). + /// + /// Use [`Self::intercept_audited`] instead when the caller can + /// also record the *terminal* status of the tool — the audit + /// trail in `pending_approvals` only carries before-and-after + /// rows when both sides report in. See #2135. pub async fn intercept( &self, tool_name: &str, action_summary: &str, args_redacted: serde_json::Value, ) -> GateOutcome { + // Drop the request_id; callers using the legacy entry point + // don't record execution. + self.intercept_audited(tool_name, action_summary, args_redacted) + .await + .0 + } + + /// Audited variant of [`Self::intercept`]. + /// + /// Returns `(outcome, Some(request_id))` when the call was + /// allowed AND a `pending_approvals` row was persisted — pass + /// the id back to [`Self::record_execution`] once the tool + /// finishes so the audit row carries both the approval and the + /// terminal status (issue #2135). + /// + /// Returns `(outcome, None)` when no DB row was created (session + /// allowlist shortcut) OR when the call was denied. In either + /// case there is nothing to record afterward, so the caller can + /// pattern-match `(GateOutcome::Allow, Some(id))` to decide + /// whether to invoke `record_execution`. + pub async fn intercept_audited( + &self, + tool_name: &str, + action_summary: &str, + args_redacted: serde_json::Value, + ) -> (GateOutcome, Option) { // Session-scoped allowlist shortcut — set by prior // ApproveAlwaysForTool decisions in this launch. { @@ -107,7 +138,7 @@ impl ApprovalGate { tool = tool_name, "[approval::gate] session-allowlist hit, skipping prompt" ); - return GateOutcome::Allow; + return (GateOutcome::Allow, None); } } @@ -142,11 +173,14 @@ impl ApprovalGate { tool = tool_name, "[approval::gate] failed to persist pending row — failing closed" ); - return GateOutcome::Deny { - reason: format!( - "Approval gate could not persist the request — denying for safety: {err}" - ), - }; + return ( + GateOutcome::Deny { + reason: format!( + "Approval gate could not persist the request — denying for safety: {err}" + ), + }, + None, + ); } publish_global(DomainEvent::ApprovalRequested { @@ -172,11 +206,14 @@ impl ApprovalGate { "[approval::gate] decision received" ); if decision.is_approve() { - GateOutcome::Allow + (GateOutcome::Allow, Some(request_id)) } else { - GateOutcome::Deny { - reason: format!("User denied '{tool_name}' execution."), - } + ( + GateOutcome::Deny { + reason: format!("User denied '{tool_name}' execution."), + }, + None, + ) } } Ok(Err(_canceled)) => { @@ -188,31 +225,94 @@ impl ApprovalGate { "[approval::gate] decision channel dropped — denying" ); let _ = store::decide(&self.config, &request_id, ApprovalDecision::Deny); - GateOutcome::Deny { - reason: format!( - "Approval channel for '{tool_name}' closed before a decision was made." - ), - } + ( + GateOutcome::Deny { + reason: format!( + "Approval channel for '{tool_name}' closed before a decision was made." + ), + }, + None, + ) } Err(_elapsed) => { self.evict_waiter(&request_id); - let _ = store::decide(&self.config, &request_id, ApprovalDecision::Deny); + // Race: `decide()` may have committed an Approve in + // SQLite right as the TTL elapsed. `store::decide(Deny)` + // has `WHERE decided_at IS NULL` so it won't overwrite, + // but without a re-read we'd return Deny here while the + // durable audit row says Approved (CodeRabbit review on + // #2367). Try to deny; if the row was already decided, + // honor the persisted decision. + let denied = store::decide(&self.config, &request_id, ApprovalDecision::Deny); + let persisted = match &denied { + Ok(Some(_)) => Some(ApprovalDecision::Deny), + Ok(None) => store::get_decision(&self.config, &request_id) + .ok() + .flatten(), + Err(_) => None, + }; + if matches!(persisted, Some(d) if d.is_approve()) { + tracing::info!( + request_id = %request_id, + tool = tool_name, + ttl_secs = self.ttl.as_secs(), + "[approval::gate] timeout race: persisted decision was Approve, honoring approval" + ); + return (GateOutcome::Allow, Some(request_id)); + } tracing::warn!( request_id = %request_id, tool = tool_name, ttl_secs = self.ttl.as_secs(), "[approval::gate] approval timed out, denying" ); - GateOutcome::Deny { - reason: format!( - "Approval for '{tool_name}' timed out after {}s.", - self.ttl.as_secs() - ), - } + ( + GateOutcome::Deny { + reason: format!( + "Approval for '{tool_name}' timed out after {}s.", + self.ttl.as_secs() + ), + }, + None, + ) } } } + /// Write the *terminal* status of a tool call onto its approval + /// audit row — see [`store::record_execution`] for semantics. + /// + /// Logs (but does not propagate) write errors: the tool has + /// already run, so audit-log loss should never bubble up as a + /// tool execution failure to the agent. If durable audit storage + /// is required for compliance, callers wire it via a stronger + /// guarantee than this best-effort hook. + pub fn record_execution( + &self, + request_id: &str, + outcome: ExecutionOutcome, + error: Option<&str>, + ) { + match store::record_execution(&self.config, request_id, outcome, error) { + Ok(true) => tracing::debug!( + request_id = %request_id, + outcome = outcome.as_str(), + "[approval::gate] recorded terminal execution" + ), + Ok(false) => tracing::warn!( + request_id = %request_id, + outcome = outcome.as_str(), + "[approval::gate] record_execution found no matching decided row" + ), + Err(err) => tracing::error!( + request_id = %request_id, + outcome = outcome.as_str(), + error = %err, + "[approval::gate] record_execution write failed" + ), + } + } + /// Apply a user decision. Returns the now-decided /// [`PendingApproval`] row when one was found. pub fn decide( @@ -282,7 +382,13 @@ mod tests { ..Config::default() }; let session = format!("test-session-{}", uuid::Uuid::new_v4()); - let gate = ApprovalGate::new(config, session, Duration::from_millis(500)); + // 500ms TTL was racing the 50×10ms poll loop on slow CI + // runners — the row would expire (and get denied by + // list_pending's lazy-expire) before `decide` could fire, + // surfacing as "pending row never appeared". 2s gives the + // polling tests enough headroom while keeping + // `timeout_returns_deny` fast (PR #2367 CI flake). + let gate = ApprovalGate::new(config, session, Duration::from_secs(2)); (gate, dir) } @@ -392,4 +498,97 @@ mod tests { .unwrap(); assert!(decided.is_none()); } + + #[tokio::test] + async fn intercept_audited_returns_request_id_only_when_allowed_and_persisted() { + let (gate, _dir) = test_gate(); + let gate = Arc::new(gate); + + // Allow path: the audited variant must hand back the + // request_id so the caller can record_execution later + // (issue #2135). + let g = gate.clone(); + let handle = tokio::spawn(async move { + g.intercept_audited("composio", "send slack", serde_json::json!({})) + .await + }); + let pending = loop { + if let Some(p) = gate.list_pending().unwrap().into_iter().next() { + break p; + } + tokio::time::sleep(Duration::from_millis(10)).await; + }; + gate.decide(&pending.request_id, ApprovalDecision::ApproveOnce) + .unwrap(); + let (outcome, id) = handle.await.unwrap(); + assert!(matches!(outcome, GateOutcome::Allow)); + assert_eq!( + id.as_deref(), + Some(pending.request_id.as_str()), + "allowed call must return its persisted request id" + ); + + // Now record execution against that id. Round-trip via a + // fresh gate to prove the row landed in durable storage. + gate.record_execution(&pending.request_id, ExecutionOutcome::Success, None); + } + + #[tokio::test] + async fn intercept_audited_returns_none_id_for_denied_and_allowlisted() { + let (gate, _dir) = test_gate(); + let gate = Arc::new(gate); + + // Deny path → no id (nothing to record afterward). + let g = gate.clone(); + let denied = tokio::spawn(async move { + g.intercept_audited("composio", "send slack", serde_json::json!({})) + .await + }); + let pending = loop { + if let Some(p) = gate.list_pending().unwrap().into_iter().next() { + break p; + } + tokio::time::sleep(Duration::from_millis(10)).await; + }; + gate.decide(&pending.request_id, ApprovalDecision::Deny) + .unwrap(); + let (outcome, id) = denied.await.unwrap(); + assert!(matches!(outcome, GateOutcome::Deny { .. })); + assert!(id.is_none(), "denied calls have nothing to record"); + + // Allowlist-shortcut path → also no id (no row was created). + let g = gate.clone(); + let first = tokio::spawn(async move { + g.intercept_audited("pushover", "first send", serde_json::json!({})) + .await + }); + let pending = loop { + if let Some(p) = gate + .list_pending() + .unwrap() + .into_iter() + .find(|p| p.tool_name == "pushover") + { + break p; + } + tokio::time::sleep(Duration::from_millis(10)).await; + }; + gate.decide(&pending.request_id, ApprovalDecision::ApproveAlwaysForTool) + .unwrap(); + let (first_outcome, first_id) = first.await.unwrap(); + assert!(matches!(first_outcome, GateOutcome::Allow)); + assert!( + first_id.is_some(), + "the prompting call still persists a row" + ); + + let (second_outcome, second_id) = gate + .intercept_audited("pushover", "second send", serde_json::json!({})) + .await; + assert!(matches!(second_outcome, GateOutcome::Allow)); + assert!( + second_id.is_none(), + "session-allowlist shortcut must not persist a row, so no id to record against" + ); + } } diff --git a/src/openhuman/approval/mod.rs b/src/openhuman/approval/mod.rs index 7b61c2e896..efbf517496 100644 --- a/src/openhuman/approval/mod.rs +++ b/src/openhuman/approval/mod.rs @@ -26,4 +26,6 @@ pub use ops::*; pub use redact::{redact_args, summarize_action}; pub use schemas::all_controller_schemas as all_approval_controller_schemas; pub use schemas::all_registered_controllers as all_approval_registered_controllers; -pub use types::{ApprovalAuditEntry, ApprovalDecision, GateOutcome, PendingApproval}; +pub use types::{ + ApprovalAuditEntry, ApprovalDecision, ExecutionOutcome, GateOutcome, PendingApproval, +}; diff --git a/src/openhuman/approval/store.rs b/src/openhuman/approval/store.rs index 0696f8c7bc..031369f029 100644 --- a/src/openhuman/approval/store.rs +++ b/src/openhuman/approval/store.rs @@ -25,22 +25,34 @@ use chrono::{DateTime, Utc}; use rusqlite::{params, types::Type, Connection}; use crate::openhuman::config::Config; +use crate::openhuman::memory::safety::sanitize_text; -use super::types::{ApprovalAuditEntry, ApprovalDecision, PendingApproval}; +use super::types::{ApprovalAuditEntry, ApprovalDecision, ExecutionOutcome, PendingApproval}; +/// SQL schema applied on every `with_connection` call. +/// +/// `executed_at`, `execution_outcome`, and `execution_error` capture +/// the *after-action* audit row introduced for issue #2135 so a +/// reader can see both "the action was approved at X" and "the +/// action ran at Y with outcome Z" from the same table. Pre-existing +/// rows from older builds back-fill these as NULL — see +/// [`migrate_columns`] for the live-upgrade path. const SCHEMA: &str = " PRAGMA foreign_keys = ON; CREATE TABLE IF NOT EXISTS pending_approvals ( - request_id TEXT PRIMARY KEY, - tool_name TEXT NOT NULL, - action_summary TEXT NOT NULL, - args_redacted TEXT NOT NULL, - session_id TEXT NOT NULL, - created_at TEXT NOT NULL, - expires_at TEXT, - decided_at TEXT, - decision TEXT + request_id TEXT PRIMARY KEY, + tool_name TEXT NOT NULL, + action_summary TEXT NOT NULL, + args_redacted TEXT NOT NULL, + session_id TEXT NOT NULL, + created_at TEXT NOT NULL, + expires_at TEXT, + decided_at TEXT, + decision TEXT, + executed_at TEXT, + execution_outcome TEXT, + execution_error TEXT ); CREATE INDEX IF NOT EXISTS idx_pending_approvals_pending ON pending_approvals(decided_at); @@ -48,6 +60,49 @@ CREATE INDEX IF NOT EXISTS idx_pending_approvals_session ON pending_approvals(session_id); "; +/// Idempotently add the post-execution audit columns to an existing +/// `pending_approvals` table. `CREATE TABLE IF NOT EXISTS` above is +/// a no-op when the table already exists, so a DB created by an +/// older build keeps the v1 schema until this migration patches it. +/// +/// SQLite has no `ADD COLUMN IF NOT EXISTS`, so we read +/// `PRAGMA table_info` and add missing columns one at a time. +fn migrate_columns(conn: &Connection) -> Result<()> { + let mut have: std::collections::HashSet = std::collections::HashSet::new(); + let mut stmt = conn + .prepare("PRAGMA table_info(pending_approvals)") + .context("[approval::store] prepare table_info")?; + let rows = stmt + .query_map(params![], |row| row.get::<_, String>(1)) + .context("[approval::store] query table_info")?; + for r in rows { + have.insert(r.context("[approval::store] table_info row decode")?); + } + for (col, ddl) in [ + ( + "executed_at", + "ALTER TABLE pending_approvals ADD COLUMN executed_at TEXT", + ), + ( + "execution_outcome", + "ALTER TABLE pending_approvals ADD COLUMN execution_outcome TEXT", + ), + ( + "execution_error", + "ALTER TABLE pending_approvals ADD COLUMN execution_error TEXT", + ), + ] { + if !have.contains(col) { + conn.execute(ddl, params![]) + .with_context(|| format!("[approval::store] add column {col}"))?; + tracing::info!(column = col, "[approval::store] migrated v1 schema"); + } + } + Ok(()) +} + +/// Open (and migrate) the approval DB, then call `f` with a live +/// connection. Mirrors `notifications/store.rs::with_connection`. fn with_connection(config: &Config, f: impl FnOnce(&Connection) -> Result) -> Result { let db_path = config.workspace_dir.join("approval").join("approval.db"); @@ -74,6 +129,7 @@ fn with_connection(config: &Config, f: impl FnOnce(&Connection) -> Result) conn.execute_batch(SCHEMA) .context("[approval::store] schema migration failed")?; + migrate_columns(&conn)?; f(&conn) } @@ -143,6 +199,33 @@ pub fn list_pending(config: &Config) -> Result> { }) } +/// Look up the persisted decision for a request_id without mutating +/// state. Returns `Ok(None)` when the row doesn't exist or hasn't +/// been decided yet. Used to resolve gate-timeout vs decide races +/// where the TTL elapses concurrently with a committed approval +/// (CodeRabbit review on PR #2367). +pub fn get_decision(config: &Config, request_id: &str) -> Result> { + with_connection(config, |conn| { + let mut stmt = conn + .prepare( + "SELECT decision FROM pending_approvals + WHERE request_id = ?1 AND decided_at IS NOT NULL", + ) + .context("[approval::store] prepare get_decision")?; + let mut rows = stmt + .query(params![request_id]) + .context("[approval::store] query get_decision")?; + if let Some(row) = rows.next().context("[approval::store] get_decision next")? { + let raw: String = row + .get(0) + .context("[approval::store] get_decision decode")?; + Ok(ApprovalDecision::from_str(&raw)) + } else { + Ok(None) + } + }) +} + /// Mark a pending row as decided and return the now-decided row. /// Returns `Ok(None)` if no row matched (already decided, expired, or /// unknown id). @@ -185,6 +268,70 @@ pub fn decide( }) } +/// Persist the terminal status of a tool call the gate previously +/// allowed. +/// +/// Writes `executed_at = now`, `execution_outcome`, and an optional +/// short error string back onto the original `pending_approvals` +/// row. Returns `Ok(true)` when the row was found and updated, +/// `Ok(false)` when no matching row exists (gate not installed, or +/// a stray `record_execution` for an id that was never persisted) — +/// the latter is a no-op so callers can fire it unconditionally +/// without branching on `Option`. +/// +/// **Invariant:** only call this AFTER `decide(..., ApproveOnce | +/// ApproveAlwaysForTool)` has succeeded — otherwise the row will +/// show an `executed_at` without a `decided_at`, which is nonsense. +/// The gate enforces this by only handing out a request_id when the +/// intercepted call was allowed. +pub fn record_execution( + config: &Config, + request_id: &str, + outcome: ExecutionOutcome, + error: Option<&str>, +) -> Result { + with_connection(config, |conn| { + let now = Utc::now().to_rfc3339(); + // Sanitize before truncation so the durable audit row can't + // leak bearer tokens, API keys, private-key blocks, OAuth + // params, emails, or other PII the upstream tool might have + // echoed back into its error message (PR #2367 review). + // Truncate-first would split a secret mid-string and dodge + // the redaction regexes — sanitize, then cap. Cap is 512 + // chars inclusive of the ellipsis marker; the agent already + // sees the full error in its own tool-result envelope so + // nothing observable depends on the stored copy. + let trimmed_error = error.map(|raw| { + let sanitized = sanitize_text(raw).value; + if sanitized.chars().count() > 512 { + let head: String = sanitized.chars().take(511).collect(); + format!("{head}…") + } else { + sanitized + } + }); + // `executed_at IS NULL` makes the terminal audit row + // immutable — the first `record_execution` call wins, and a + // late retry/cleanup path can't silently rewrite the original + // outcome (CodeRabbit review on #2367). `decided_at IS NOT + // NULL` keeps the monotonic invariant (no "executed before + // approved" rows). + let updated = conn + .execute( + "UPDATE pending_approvals + SET executed_at = ?1, + execution_outcome = ?2, + execution_error = ?3 + WHERE request_id = ?4 + AND decided_at IS NOT NULL + AND executed_at IS NULL", + params![now, outcome.as_str(), trimmed_error, request_id], + ) + .context("[approval::store] record_execution update")?; + Ok(updated > 0) + }) +} + /// List recently decided approval rows for durable audit views. pub fn list_recent_decisions(config: &Config, limit: usize) -> Result> { let limit = limit.clamp(1, 500); @@ -436,6 +583,26 @@ mod tests { assert_eq!(remaining[0].request_id, "p3"); } + #[test] + fn get_decision_returns_none_until_decided_then_persisted_value() { + // PR #2367 review: timeout-vs-decide race resolution in the + // gate calls `get_decision` after a denied UPDATE no-ops. + // Undecided rows and unknown ids must both return `None`, + // and decided rows must round-trip the persisted decision. + let (config, _dir) = test_config(); + assert!(get_decision(&config, "missing").unwrap().is_none()); + insert_pending(&config, &sample("race", "sess-A")).unwrap(); + assert!( + get_decision(&config, "race").unwrap().is_none(), + "undecided row reports no decision" + ); + decide(&config, "race", ApprovalDecision::ApproveOnce).unwrap(); + assert_eq!( + get_decision(&config, "race").unwrap(), + Some(ApprovalDecision::ApproveOnce) + ); + } + #[test] fn pending_row_survives_connection_close() { let (config, _dir) = test_config(); @@ -445,6 +612,226 @@ mod tests { assert_eq!(rows[0].request_id, "survives"); } + // ── record_execution / column-migration tests (#2135) ────────── + + fn read_execution_row( + config: &Config, + request_id: &str, + ) -> (Option, Option, Option) { + with_connection(config, |conn| { + let mut stmt = conn + .prepare( + "SELECT executed_at, execution_outcome, execution_error + FROM pending_approvals WHERE request_id = ?1", + ) + .unwrap(); + let mut rows = stmt.query(params![request_id]).unwrap(); + let row = rows.next().unwrap().expect("row exists"); + Ok(( + row.get::<_, Option>(0).unwrap(), + row.get::<_, Option>(1).unwrap(), + row.get::<_, Option>(2).unwrap(), + )) + }) + .unwrap() + } + + #[test] + fn record_execution_writes_terminal_audit_row_after_decide() { + let (config, _dir) = test_config(); + insert_pending(&config, &sample("req-exec", "sess-A")).unwrap(); + // Before decide, record_execution must not patch the row — + // a decided_at IS NOT NULL guard keeps the audit trail + // monotonic (no "executed before approved"). + let early = record_execution(&config, "req-exec", ExecutionOutcome::Success, None).unwrap(); + assert!(!early, "record_execution before decide must be a no-op"); + let (exec_at, _, _) = read_execution_row(&config, "req-exec"); + assert!(exec_at.is_none()); + + decide(&config, "req-exec", ApprovalDecision::ApproveOnce).unwrap(); + let ok = record_execution(&config, "req-exec", ExecutionOutcome::Success, None).unwrap(); + assert!(ok, "record_execution after decide must update the row"); + let (exec_at, outcome, error) = read_execution_row(&config, "req-exec"); + assert!(exec_at.is_some()); + assert_eq!(outcome.as_deref(), Some("success")); + assert!(error.is_none()); + } + + #[test] + fn record_execution_persists_outcome_and_redacted_error() { + let (config, _dir) = test_config(); + insert_pending(&config, &sample("req-fail", "sess-A")).unwrap(); + decide(&config, "req-fail", ApprovalDecision::ApproveOnce).unwrap(); + + record_execution( + &config, + "req-fail", + ExecutionOutcome::Failure, + Some("backend returned 500"), + ) + .unwrap(); + + let (_, outcome, error) = read_execution_row(&config, "req-fail"); + assert_eq!(outcome.as_deref(), Some("failure")); + assert_eq!(error.as_deref(), Some("backend returned 500")); + } + + #[test] + fn record_execution_caps_long_error_messages() { + let (config, _dir) = test_config(); + insert_pending(&config, &sample("req-long", "sess-A")).unwrap(); + decide(&config, "req-long", ApprovalDecision::ApproveOnce).unwrap(); + + let huge = "x".repeat(2_000); + record_execution(&config, "req-long", ExecutionOutcome::Failure, Some(&huge)).unwrap(); + + let (_, _, error) = read_execution_row(&config, "req-long"); + let stored = error.expect("error stored"); + // 512-char cap is inclusive of the ellipsis marker + // (CodeRabbit review on #2367) — anything longer would let + // upstream crash dumps slowly fill the audit log. + assert_eq!( + stored.chars().count(), + 512, + "truncated value must be exactly 512 chars (incl. ellipsis): {} chars", + stored.chars().count() + ); + assert!(stored.ends_with('…')); + } + + #[test] + fn record_execution_redacts_secrets_in_error_message() { + // PR #2367 review: upstream tool errors regularly echo back + // the offending request including auth headers. The audit + // row must persist the sanitized form so a leaked bearer + // or API key never lands in the durable log. + let (config, _dir) = test_config(); + insert_pending(&config, &sample("req-secret", "sess-A")).unwrap(); + decide(&config, "req-secret", ApprovalDecision::ApproveOnce).unwrap(); + + let raw = "upstream 401: Authorization: Bearer sk-live-abcdef1234567890abcdef1234567890 \ + returned by sk-proj-FAKEFAKEFAKEFAKEFAKEFAKEFAKEFAKEFAKEFAKEFAKEFAKE"; + record_execution(&config, "req-secret", ExecutionOutcome::Failure, Some(raw)).unwrap(); + + let (_, _, error) = read_execution_row(&config, "req-secret"); + let stored = error.expect("error stored"); + assert!( + !stored.contains("sk-live-abcdef1234567890abcdef1234567890"), + "raw bearer token must not be persisted: {stored}" + ); + assert!( + !stored.contains("sk-proj-FAKEFAKEFAKEFAKEFAKEFAKEFAKEFAKEFAKEFAKEFAKEFAKE"), + "raw provider key must not be persisted: {stored}" + ); + assert!( + stored.contains("[REDACTED]"), + "sanitizer must leave a redaction marker so audit reviewers see something was scrubbed: {stored}" + ); + } + + #[test] + fn record_execution_is_idempotent_after_first_terminal_report_wins() { + // CodeRabbit review on #2367: a late retry / cleanup path + // must NOT rewrite the original audit row. The first + // `record_execution` call wins; subsequent calls return + // `false` and leave the row unchanged. + let (config, _dir) = test_config(); + insert_pending(&config, &sample("req-idem", "sess-A")).unwrap(); + decide(&config, "req-idem", ApprovalDecision::ApproveOnce).unwrap(); + + // First report: succeeds, row gets stamped. + let first = record_execution( + &config, + "req-idem", + ExecutionOutcome::Success, + Some("ok-first"), + ) + .unwrap(); + assert!(first); + let (exec_at_1, outcome_1, error_1) = read_execution_row(&config, "req-idem"); + assert!(exec_at_1.is_some()); + assert_eq!(outcome_1.as_deref(), Some("success")); + assert_eq!(error_1.as_deref(), Some("ok-first")); + + // Second report (e.g. a late retry that finally noticed the + // outcome) must be a no-op and must NOT change the stored + // outcome or timestamp. + let second = record_execution( + &config, + "req-idem", + ExecutionOutcome::Failure, + Some("late-failure-noise"), + ) + .unwrap(); + assert!( + !second, + "second record_execution must report no row updated" + ); + + let (exec_at_2, outcome_2, error_2) = read_execution_row(&config, "req-idem"); + assert_eq!(exec_at_2, exec_at_1, "executed_at must not change"); + assert_eq!(outcome_2.as_deref(), Some("success")); + assert_eq!(error_2.as_deref(), Some("ok-first")); + } + + #[test] + fn record_execution_unknown_id_is_safe_noop() { + let (config, _dir) = test_config(); + let ok = record_execution(&config, "never-here", ExecutionOutcome::Success, None).unwrap(); + assert!(!ok, "unknown id must report no row updated"); + } + + #[test] + fn migrate_columns_is_idempotent_on_v1_databases() { + // Simulate an older build by creating the v1 table shape + // manually (no executed_at / execution_outcome / execution_error) + // then opening the store via with_connection — the migration + // must add the missing columns without losing existing rows. + let dir = TempDir::new().unwrap(); + let workspace = dir.path().to_path_buf(); + let db_path = workspace.join("approval").join("approval.db"); + std::fs::create_dir_all(db_path.parent().unwrap()).unwrap(); + { + let conn = Connection::open(&db_path).unwrap(); + conn.execute_batch( + "CREATE TABLE pending_approvals ( + request_id TEXT PRIMARY KEY, + tool_name TEXT NOT NULL, + action_summary TEXT NOT NULL, + args_redacted TEXT NOT NULL, + session_id TEXT NOT NULL, + created_at TEXT NOT NULL, + expires_at TEXT, + decided_at TEXT, + decision TEXT + );", + ) + .unwrap(); + conn.execute( + "INSERT INTO pending_approvals + (request_id, tool_name, action_summary, args_redacted, + session_id, created_at) + VALUES ('legacy', 'composio', 'legacy row', '{}', 'sess-X', ?1)", + params![Utc::now().to_rfc3339()], + ) + .unwrap(); + } + let config = Config { + workspace_dir: workspace, + ..Config::default() + }; + // First open triggers the migration; existing row survives. + let rows = list_pending(&config).unwrap(); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].request_id, "legacy"); + // After migration, record_execution must work end-to-end. + decide(&config, "legacy", ApprovalDecision::ApproveOnce).unwrap(); + assert!(record_execution(&config, "legacy", ExecutionOutcome::Success, None).unwrap()); + // Second open must be a no-op (migration is idempotent). + let rows = list_pending(&config).unwrap(); + assert!(rows.is_empty(), "decided rows should not appear in pending"); + } + #[test] fn list_pending_expires_stale_rows_before_returning() { let (config, _dir) = test_config(); diff --git a/src/openhuman/approval/types.rs b/src/openhuman/approval/types.rs index 22f513ba68..a095f16669 100644 --- a/src/openhuman/approval/types.rs +++ b/src/openhuman/approval/types.rs @@ -86,6 +86,45 @@ pub enum GateOutcome { Deny { reason: String }, } +/// Terminal status of a tool action that the gate previously allowed. +/// +/// Recorded after the tool finishes so the audit row in +/// `pending_approvals` carries a full before-and-after trail per the +/// issue #2135 acceptance criterion. The variant set is intentionally +/// small — anything richer belongs in the structured tool result, +/// not the approval audit row. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ExecutionOutcome { + /// Tool ran and returned a non-error [`ToolResult`]. + Success, + /// Tool ran and returned an error [`ToolResult`] (or panicked). + Failure, + /// Tool did not run because the runtime aborted (timeout, + /// cancellation, supervisor shutdown). + Aborted, +} + +impl ExecutionOutcome { + pub fn as_str(self) -> &'static str { + match self { + Self::Success => "success", + Self::Failure => "failure", + Self::Aborted => "aborted", + } + } + + #[allow(clippy::should_implement_trait)] + pub fn from_str(s: &str) -> Option { + match s { + "success" => Some(Self::Success), + "failure" => Some(Self::Failure), + "aborted" => Some(Self::Aborted), + _ => None, + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -118,4 +157,28 @@ mod tests { let s = serde_json::to_string(&ApprovalDecision::ApproveAlwaysForTool).unwrap(); assert_eq!(s, "\"approve_always_for_tool\""); } + + #[test] + fn execution_outcome_round_trips() { + for o in [ + ExecutionOutcome::Success, + ExecutionOutcome::Failure, + ExecutionOutcome::Aborted, + ] { + assert_eq!(ExecutionOutcome::from_str(o.as_str()), Some(o)); + } + assert!(ExecutionOutcome::from_str("partial").is_none()); + } + + #[test] + fn execution_outcome_serializes_as_snake_case() { + assert_eq!( + serde_json::to_string(&ExecutionOutcome::Success).unwrap(), + "\"success\"" + ); + assert_eq!( + serde_json::to_string(&ExecutionOutcome::Aborted).unwrap(), + "\"aborted\"" + ); + } } diff --git a/src/openhuman/channels/proactive.rs b/src/openhuman/channels/proactive.rs index 1f41f5873d..fcc2cd5051 100644 --- a/src/openhuman/channels/proactive.rs +++ b/src/openhuman/channels/proactive.rs @@ -164,11 +164,18 @@ impl EventHandler for ProactiveMessageSubscriber { "[proactive] delivering to active external channel" ); - // ── External-effect approval gate (#1339) ───── + // ── External-effect approval gate (#1339, #2135) ─ // Proactive sends to Telegram/Discord/Slack/etc. // are outbound writes — route through the gate // before handing off to the channel implementation. - // Web delivery above is internal and exempt. + // Web delivery above is internal and exempt. When + // the gate persists an approval row, we keep its + // `request_id` so we can record the delivery + // outcome after `ch.send` returns (issue #2135). + let mut approval_request_id: Option = None; + let mut approval_gate_for_audit: Option< + std::sync::Arc, + > = None; if let Some(gate) = crate::openhuman::approval::ApprovalGate::try_global() { let summary = format!( "proactive-send to {key} ({} chars)", @@ -179,11 +186,16 @@ impl EventHandler for ProactiveMessageSubscriber { "source": source.to_string(), "message_chars": message.chars().count(), }); - match gate - .intercept("channels.proactive_send", &summary, redacted) - .await - { - crate::openhuman::approval::GateOutcome::Allow => {} + let (outcome, request_id) = gate + .intercept_audited("channels.proactive_send", &summary, redacted) + .await; + match outcome { + crate::openhuman::approval::GateOutcome::Allow => { + approval_request_id = request_id; + if approval_request_id.is_some() { + approval_gate_for_audit = Some(gate); + } + } crate::openhuman::approval::GateOutcome::Deny { reason } => { tracing::warn!( source = %source, @@ -196,7 +208,26 @@ impl EventHandler for ProactiveMessageSubscriber { } } - match ch.send(&SendMessage::new(message, "")).await { + let send_result = ch.send(&SendMessage::new(message, "")).await; + // Record the terminal status on the approval audit + // row before we log the outcome — best-effort, see + // #2135. `record_execution` itself logs write + // errors so we don't pile on here. + if let (Some(gate), Some(req_id)) = ( + approval_gate_for_audit.as_ref(), + approval_request_id.as_ref(), + ) { + let (exec_outcome, err_text) = match &send_result { + Ok(()) => (crate::openhuman::approval::ExecutionOutcome::Success, None), + Err(e) => ( + crate::openhuman::approval::ExecutionOutcome::Failure, + Some(e.to_string()), + ), + }; + gate.record_execution(req_id, exec_outcome, err_text.as_deref()); + } + + match send_result { Ok(()) => { tracing::debug!( source = %source,