diff --git a/app/src/lib/composio/formatters.test.ts b/app/src/lib/composio/formatters.test.ts index 947b42485e..a059f71b37 100644 --- a/app/src/lib/composio/formatters.test.ts +++ b/app/src/lib/composio/formatters.test.ts @@ -1,6 +1,59 @@ import { describe, expect, it } from 'vitest'; -import { formatTriggerLabel } from './formatters'; +import { formatComposioToolError, formatTriggerLabel } from './formatters'; + +describe('formatComposioToolError', () => { + it('strips the classified prefix and returns the body', () => { + const raw = + '[composio:error:insufficient_scope] `GMAIL_FETCH_EMAILS` was rejected because the connected gmail account is missing required permissions.'; + expect(formatComposioToolError(raw)).toContain('missing required permissions'); + expect(formatComposioToolError(raw)).not.toContain('[composio:error:'); + }); + + it('passes through unclassified messages', () => { + expect(formatComposioToolError('plain failure')).toBe('plain failure'); + }); + + it('returns empty string for null/undefined/empty input', () => { + expect(formatComposioToolError(null)).toBe(''); + expect(formatComposioToolError(undefined)).toBe(''); + expect(formatComposioToolError('')).toBe(''); + }); + + it('falls back to validation copy when body is empty', () => { + expect(formatComposioToolError('[composio:error:validation]')).toBe('Invalid tool arguments.'); + }); + + it('falls back to insufficient_scope copy when body is empty', () => { + expect(formatComposioToolError('[composio:error:insufficient_scope]')).toBe( + 'Reconnect this integration and grant the requested permissions.' + ); + }); + + it('falls back to rate_limited copy when body is empty', () => { + expect(formatComposioToolError('[composio:error:rate_limited]')).toBe( + 'The upstream service is rate-limiting requests. Try again shortly.' + ); + }); + + it('falls back to gateway copy when body is empty', () => { + expect(formatComposioToolError('[composio:error:gateway]')).toBe( + 'Temporary connection issue. Try again in a moment.' + ); + }); + + it('returns body for unknown class when present', () => { + expect(formatComposioToolError('[composio:error:something_new] details here')).toBe( + 'details here' + ); + }); + + it('falls back to trimmed raw for unknown class with empty body', () => { + expect(formatComposioToolError(' [composio:error:something_new] ')).toBe( + '[composio:error:something_new]' + ); + }); +}); describe('formatTriggerLabel', () => { it('formats GOOGLECALENDAR_GOOGLE_CALENDAR_EVENT_CREATED_TRIGGER correctly', () => { diff --git a/app/src/lib/composio/formatters.ts b/app/src/lib/composio/formatters.ts index 54dd78afcd..b19e91a88f 100644 --- a/app/src/lib/composio/formatters.ts +++ b/app/src/lib/composio/formatters.ts @@ -11,6 +11,29 @@ * 4. dedupe leading provider prefix when it reappears * 5. split on _, title-case each token, join with space */ +/** + * Parse a classified Composio error (`[composio:error:] …`) for UI copy. + */ +export function formatComposioToolError(raw: string | null | undefined): string { + if (!raw) return ''; + const match = /^\[composio:error:([a-z_]+)\]\s*(.*)$/is.exec(raw.trim()); + if (!match) return raw.trim(); + + const [, className, body] = match; + switch (className) { + case 'validation': + return body || 'Invalid tool arguments.'; + case 'insufficient_scope': + return body || 'Reconnect this integration and grant the requested permissions.'; + case 'rate_limited': + return body || 'The upstream service is rate-limiting requests. Try again shortly.'; + case 'gateway': + return body || 'Temporary connection issue. Try again in a moment.'; + default: + return body || raw.trim(); + } +} + export function formatTriggerLabel( slug: string | null | undefined, opts?: { overrides?: Record } diff --git a/src/openhuman/composio/action_tool.rs b/src/openhuman/composio/action_tool.rs index dc9fb80b9f..cdab7fbb7e 100644 --- a/src/openhuman/composio/action_tool.rs +++ b/src/openhuman/composio/action_tool.rs @@ -29,7 +29,7 @@ use std::sync::Arc; use async_trait::async_trait; use serde_json::Value; -use super::client::{create_composio_client, direct_execute, ComposioClientKind}; +use super::client::create_composio_client; use super::providers::ToolScope; use super::tools::resolve_action_scope; use crate::openhuman::agent::harness::current_sandbox_mode; @@ -195,33 +195,18 @@ impl Tool for ComposioActionTool { }; let started = std::time::Instant::now(); - let res = match kind { - ComposioClientKind::Backend(client) => { - tracing::debug!( - tool = %self.action_name, - "[composio] per-action execute: backend variant" - ); - // Wrap with auth_retry so a stale tinyhumans-tenant - // JWT gets refreshed-and-replayed once before surfacing - // (upstream behaviour). - super::auth_retry::execute_with_auth_retry(&client, &self.action_name, args).await - } - ComposioClientKind::Direct(direct) => { - tracing::debug!( - tool = %self.action_name, - "[composio] per-action execute: direct variant" - ); - // Direct path skips auth_retry — see ComposioExecuteTool - // for rationale (no backend refresh surface). - direct_execute( - &direct, - &self.action_name, - args, - &live_config.composio.entity_id, - ) - .await - } - }; + // Route through the centralized dispatcher (#1797) so both + // backend and direct variants share the same prepare/retry/error- + // mapping pipeline. The dispatcher applies `format_provider_error` + // to failures (transport + provider) so downstream consumers can + // parse `[composio:error:] …`. + let res = super::execute_dispatch::execute_composio_action_kind( + kind, + &self.action_name, + args, + &live_config.composio.entity_id, + ) + .await; let elapsed_ms = started.elapsed().as_millis() as u64; match res { @@ -267,7 +252,7 @@ impl Tool for ComposioActionTool { elapsed_ms, }, ); - Ok(ToolResult::error(format!("{}: {e}", self.action_name))) + Ok(ToolResult::error(e)) } } } diff --git a/src/openhuman/composio/client.rs b/src/openhuman/composio/client.rs index aa988cde04..8d45d605f5 100644 --- a/src/openhuman/composio/client.rs +++ b/src/openhuman/composio/client.rs @@ -166,7 +166,8 @@ impl ComposioClient { if tool.is_empty() { anyhow::bail!("composio.execute_tool_once: tool slug must not be empty"); } - let arguments = arguments.unwrap_or(serde_json::Value::Object(Default::default())); + let arguments = super::execute_prepare::prepare_execute_arguments(tool, arguments) + .map_err(anyhow::Error::msg)?; tracing::debug!(tool = %tool, "[composio] execute_tool_once (no built-in retry)"); let body = json!({ "tool": tool, "arguments": arguments }); let result = self.post_execute_tool(&body).await; @@ -183,7 +184,12 @@ impl ComposioClient { "[composio] execute_tool_once failed" ), } - result + result.map_err(|e| { + anyhow::Error::msg(super::error_mapping::remap_transport_error( + tool, + &e.to_string(), + )) + }) } /// `POST /agent-integrations/composio/execute` — run a Composio @@ -197,12 +203,26 @@ impl ComposioClient { if tool.is_empty() { anyhow::bail!("composio.execute_tool: tool slug must not be empty"); } - let mut arguments = arguments.unwrap_or(serde_json::Value::Object(Default::default())); - normalize_calendar_query_args(tool, &mut arguments); + // PR #1827 routes all execute-side argument normalization + // (including the bare-date → RFC 3339 fix #1802 brought to + // `normalize_calendar_query_args` on `main`) through the + // centralized `prepare_execute_arguments` helper. The helper + // covers the same calendar query case and is the shared entry + // point for `composio_execute`, per-action tools, and direct- + // mode dispatch. + let arguments = super::execute_prepare::prepare_execute_arguments(tool, arguments) + .map_err(anyhow::Error::msg)?; tracing::debug!(tool = %tool, "[composio] execute_tool"); let body = json!({ "tool": tool, "arguments": arguments }); - self.execute_tool_with_post_oauth_retry(tool, &body, POST_OAUTH_ACTION_RETRY_DELAY) - .await + let mut resp = self + .execute_tool_with_post_oauth_retry(tool, &body, POST_OAUTH_ACTION_RETRY_DELAY) + .await?; + if !resp.successful { + if let Some(ref err) = resp.error { + resp.error = Some(super::error_mapping::format_provider_error(tool, err)); + } + } + Ok(resp) } pub(super) async fn execute_tool_with_post_oauth_retry( @@ -506,58 +526,6 @@ impl ComposioClient { } } -/// Calendar query slugs whose `timeMin`/`timeMax` values should be -/// normalized to RFC 3339 timestamps. LLM-generated arguments sometimes -/// emit bare dates like `"2026-05-14"` instead of -/// `"2026-05-14T00:00:00Z"`, which Google Calendar rejects. -const CALENDAR_QUERY_SLUGS: &[&str] = &["GOOGLECALENDAR_EVENTS_LIST", "GOOGLECALENDAR_FIND_EVENT"]; - -/// Normalize `timeMin`/`timeMax` from bare dates to RFC 3339 for -/// Google Calendar query slugs. The LLM prompt instructs the model to -/// use RFC 3339 format, but some model invocations still produce bare -/// `YYYY-MM-DD` strings. -fn normalize_calendar_query_args(tool: &str, arguments: &mut serde_json::Value) { - if !CALENDAR_QUERY_SLUGS.contains(&tool) { - return; - } - let Some(map) = arguments.as_object_mut() else { - return; - }; - for key in &["timeMin", "timeMax"] { - if let Some(serde_json::Value::String(val)) = map.get(*key).cloned() { - if is_bare_date(&val) { - let normalized = format!("{}T00:00:00Z", val); - tracing::debug!( - tool = %tool, - key = %key, - normalized = %normalized, - "[composio] normalized bare date to RFC 3339 for calendar query" - ); - map.insert((*key).to_string(), serde_json::Value::String(normalized)); - } - } - } -} - -/// Returns `true` when `s` is a bare date string like `"2026-05-14"` -/// with no time component. -fn is_bare_date(s: &str) -> bool { - if s.len() != 10 { - return false; - } - let bytes = s.as_bytes(); - bytes[0].is_ascii_digit() - && bytes[1].is_ascii_digit() - && bytes[2].is_ascii_digit() - && bytes[3].is_ascii_digit() - && bytes[4] == b'-' - && bytes[5].is_ascii_digit() - && bytes[6].is_ascii_digit() - && bytes[7] == b'-' - && bytes[8].is_ascii_digit() - && bytes[9].is_ascii_digit() -} - fn is_post_oauth_auth_readiness_error(resp: &ComposioExecuteResponse) -> bool { if resp.successful { return false; diff --git a/src/openhuman/composio/client_tests.rs b/src/openhuman/composio/client_tests.rs index d0dcdf6c07..e40f3fac71 100644 --- a/src/openhuman/composio/client_tests.rs +++ b/src/openhuman/composio/client_tests.rs @@ -750,7 +750,9 @@ async fn execute_tool_surfaces_non_successful_provider_response() { ); let base = start_mock_backend(app).await; let client = build_client_for(base); - let resp = client.execute_tool("GMAIL_SEND_EMAIL", None).await.unwrap(); + // Use a slug that bypasses local arg validation in `execute_prepare` + // so the test exercises the gateway-response path, not the pre-flight. + let resp = client.execute_tool("ANY_TOOL", None).await.unwrap(); assert!( !resp.successful, "non-successful provider response must be surfaced via the successful flag" @@ -854,103 +856,11 @@ async fn execute_tool_sends_tool_slug_in_request_body() { "tool slug must be forwarded in request body" ); } -// ── Calendar query argument normalization ─────────────────────── - -#[test] -fn is_bare_date_rejects_non_date_strings() { - assert!(!is_bare_date("")); - assert!(!is_bare_date("2026-05")); - assert!(!is_bare_date("2026-05-14T00:00:00Z")); - assert!(!is_bare_date("2026/05/14")); - assert!(!is_bare_date("hello-world")); - assert!(!is_bare_date("2026-5-14")); - assert!(!is_bare_date("2026-05-144")); -} - -#[test] -fn is_bare_date_accepts_valid_date_strings() { - assert!(is_bare_date("2026-05-14")); - assert!(is_bare_date("2025-01-01")); - assert!(is_bare_date("1999-12-31")); - assert!(is_bare_date("0001-01-01")); -} - -#[test] -fn normalize_calendar_query_args_ignores_non_calendar_slugs() { - let mut args = serde_json::json!({ "timeMin": "2026-05-14" }); - normalize_calendar_query_args("GMAIL_SEND_EMAIL", &mut args); - assert_eq!(args["timeMin"], "2026-05-14"); -} - -#[test] -fn normalize_calendar_query_args_converts_bare_date_to_rfc3339() { - let mut args = serde_json::json!({ - "connectionId": "conn-1", - "timeMin": "2026-05-14", - "timeMax": "2026-05-15", - }); - normalize_calendar_query_args("GOOGLECALENDAR_EVENTS_LIST", &mut args); - assert_eq!(args["timeMin"], "2026-05-14T00:00:00Z"); - assert_eq!(args["timeMax"], "2026-05-15T00:00:00Z"); - assert_eq!(args["connectionId"], "conn-1"); -} - -#[test] -fn normalize_calendar_query_args_preserves_rfc3339_timestamp() { - let mut args = serde_json::json!({ - "timeMin": "2026-05-14T00:00:00+05:30", - "timeMax": "2026-05-14T23:59:59Z", - }); - normalize_calendar_query_args("GOOGLECALENDAR_EVENTS_LIST", &mut args); - assert_eq!(args["timeMin"], "2026-05-14T00:00:00+05:30"); - assert_eq!(args["timeMax"], "2026-05-14T23:59:59Z"); -} - -#[test] -fn normalize_calendar_query_args_handles_missing_time_fields() { - let mut args = serde_json::json!({ "connectionId": "conn-1" }); - normalize_calendar_query_args("GOOGLECALENDAR_EVENTS_LIST", &mut args); - assert_eq!(args["connectionId"], "conn-1"); - // timeMin/timeMax should not be inserted if absent - assert!(args.get("timeMin").is_none()); -} - -#[test] -fn normalize_calendar_query_args_handles_non_object_arguments() { - let mut args = serde_json::json!("just a string"); - normalize_calendar_query_args("GOOGLECALENDAR_EVENTS_LIST", &mut args); - assert_eq!(args, "just a string"); -} - -#[test] -fn normalize_calendar_query_args_handles_calendar_find_event_slug() { - let mut args = serde_json::json!({ "timeMin": "2026-06-01" }); - normalize_calendar_query_args("GOOGLECALENDAR_FIND_EVENT", &mut args); - assert_eq!(args["timeMin"], "2026-06-01T00:00:00Z"); -} - -#[test] -fn normalize_calendar_query_args_normalizes_one_side_when_other_absent() { - // Asymmetric: only timeMin present, timeMax missing. The normalizer - // must convert timeMin without inserting a synthetic timeMax. - let mut args = serde_json::json!({ "timeMin": "2026-05-14" }); - normalize_calendar_query_args("GOOGLECALENDAR_EVENTS_LIST", &mut args); - assert_eq!(args["timeMin"], "2026-05-14T00:00:00Z"); - assert!(args.get("timeMax").is_none()); -} - -#[test] -fn normalize_calendar_query_args_skips_non_string_values() { - // Non-string values (numbers, bools, nulls, objects) must be left - // untouched — the normalizer only rewrites string bare-date inputs. - let mut args = serde_json::json!({ - "timeMin": 42, - "timeMax": null, - }); - normalize_calendar_query_args("GOOGLECALENDAR_EVENTS_LIST", &mut args); - assert_eq!(args["timeMin"], 42); - assert!(args["timeMax"].is_null()); -} +// Calendar bare-date → RFC 3339 normalization is now covered by +// `execute_prepare::prepare_execute_arguments` (PR #1827); see +// `execute_prepare_tests.rs` for the equivalent test surface that +// supersedes the per-slug `normalize_calendar_query_args` helper +// removed alongside the upstream-main merge. // ── Factory tests (`create_composio_client`) ──────────────────────── // diff --git a/src/openhuman/composio/error_mapping.rs b/src/openhuman/composio/error_mapping.rs new file mode 100644 index 0000000000..46df293db6 --- /dev/null +++ b/src/openhuman/composio/error_mapping.rs @@ -0,0 +1,214 @@ +//! Classify and format Composio tool failures so validation, scope, and +//! upstream-provider errors are not surfaced as generic gateway (502) failures. +//! +//! Issue #1797 — Composio support found tool-level failures on their side while +//! OpenHuman was bucketing them as HTTP 502 / gateway instability. + +/// Stable, grep-friendly error classes for metrics and UI routing. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ComposioErrorClass { + Validation, + InsufficientScope, + RateLimited, + UpstreamProvider, + ComposioPlatform, + Gateway, + Other, +} + +impl ComposioErrorClass { + pub fn as_str(self) -> &'static str { + match self { + Self::Validation => "validation", + Self::InsufficientScope => "insufficient_scope", + Self::RateLimited => "rate_limited", + Self::UpstreamProvider => "upstream_provider", + Self::ComposioPlatform => "composio_platform", + Self::Gateway => "gateway", + Self::Other => "other", + } + } +} + +pub fn classify_composio_error(tool: &str, message: &str) -> ComposioErrorClass { + let lower = message.to_ascii_lowercase(); + let class = if is_validation_shape(&lower) { + ComposioErrorClass::Validation + } else if is_insufficient_scope_shape(&lower) { + ComposioErrorClass::InsufficientScope + } else if is_rate_limited_shape(&lower) { + ComposioErrorClass::RateLimited + } else if is_gateway_transport_shape(&lower) && !is_embedded_provider_failure(&lower) { + ComposioErrorClass::Gateway + } else if is_composio_platform_shape(&lower) { + ComposioErrorClass::ComposioPlatform + } else if tool.starts_with("GMAIL_") + || tool.starts_with("SLACK_") + || tool.starts_with("NOTION_") + || tool.starts_with("GOOGLECALENDAR_") + { + ComposioErrorClass::UpstreamProvider + } else { + ComposioErrorClass::Other + }; + tracing::debug!( + tool = %tool, + class = class.as_str(), + "[composio][classify] error classified" + ); + class +} + +pub fn format_provider_error(tool: &str, raw: &str) -> String { + let class = classify_composio_error(tool, raw); + let detail = raw.trim(); + let body = match class { + ComposioErrorClass::Validation => format!("Invalid arguments for `{tool}`: {detail}"), + ComposioErrorClass::InsufficientScope => format_insufficient_scope_message(tool, detail), + ComposioErrorClass::RateLimited => format_rate_limited_message(tool, detail), + ComposioErrorClass::UpstreamProvider => { + format!("`{tool}` failed at the connected provider: {detail}") + } + ComposioErrorClass::ComposioPlatform => { + format!("Composio connection issue for `{tool}`: {detail}") + } + ComposioErrorClass::Gateway => { + format!("Temporary gateway error while calling `{tool}`: {detail}") + } + ComposioErrorClass::Other => format!("`{tool}` failed: {detail}"), + }; + prefix_class(class, &body) +} + +pub fn remap_transport_error(tool: &str, raw: &str) -> String { + let detail = extract_transport_detail(raw); + let class = if is_embedded_provider_failure(&detail) { + classify_composio_error(tool, &detail) + } else if is_gateway_transport_shape(raw) { + ComposioErrorClass::Gateway + } else { + classify_composio_error(tool, raw) + }; + let body = match class { + ComposioErrorClass::InsufficientScope => format_insufficient_scope_message(tool, &detail), + ComposioErrorClass::RateLimited => format_rate_limited_message(tool, &detail), + ComposioErrorClass::Gateway => format!( + "Temporary gateway error while calling `{tool}`: {}", + summarize_gateway(raw) + ), + ComposioErrorClass::Validation => format!("Invalid arguments for `{tool}`: {detail}"), + ComposioErrorClass::UpstreamProvider => { + format!("`{tool}` failed at the connected provider: {detail}") + } + ComposioErrorClass::ComposioPlatform => { + format!("Composio connection issue for `{tool}`: {detail}") + } + ComposioErrorClass::Other => format!("`{tool}` failed: {detail}"), + }; + prefix_class(class, &body) +} + +fn prefix_class(class: ComposioErrorClass, body: &str) -> String { + format!("[composio:error:{}] {}", class.as_str(), body) +} + +fn format_insufficient_scope_message(tool: &str, detail: &str) -> String { + let toolkit = tool + .split('_') + .next() + .unwrap_or("integration") + .to_ascii_lowercase(); + format!( + "`{tool}` was rejected because the connected {toolkit} account is missing required \ + permissions ({detail}). Reconnect the integration in Settings → Skills and grant the \ + scopes requested during OAuth." + ) +} + +fn format_rate_limited_message(tool: &str, detail: &str) -> String { + format!( + "`{tool}` hit an upstream rate limit ({detail}). Wait a minute and retry, or reduce \ + call frequency — this is not an OpenHuman gateway outage." + ) +} + +fn is_validation_shape(lower: &str) -> bool { + lower.contains("invalid arguments") + || lower.contains("missing required") + || lower.contains("must not be empty") + || lower.contains("required field") + || lower.contains("bad request") + || lower.contains("invalid date") + || lower.contains("rfc 3339") + || lower.contains("timemax") + || lower.contains("timemin") +} + +fn is_insufficient_scope_shape(lower: &str) -> bool { + lower.contains("insufficient authentication scopes") + || lower.contains("insufficient scope") + || lower.contains("insufficient permissions") + || (lower.contains("403") && lower.contains("scope")) + || lower.contains("invalid oauth scope") +} + +fn is_rate_limited_shape(lower: &str) -> bool { + lower.contains("rate limit") + || lower.contains("rate_limit") + || lower.contains("ratelimited") + || lower.contains("too many requests") + || lower.contains("429") +} + +fn is_composio_platform_shape(lower: &str) -> bool { + lower.contains("connection error, try to authenticate") + || lower.contains("not enabled") + || lower.contains("not connected") + || lower.contains("token revoked") +} + +fn is_gateway_transport_shape(lower: &str) -> bool { + lower.contains("backend returned 502") + || lower.contains("502 bad gateway") + || lower.contains("backend returned 503") + || lower.contains("backend returned 504") + || lower.contains("(502 ") + || lower.contains("(503 ") + || lower.contains("(504 ") +} + +fn is_embedded_provider_failure(lower: &str) -> bool { + is_validation_shape(lower) + || is_insufficient_scope_shape(lower) + || is_rate_limited_shape(lower) + || is_composio_platform_shape(lower) + || lower.contains("composio") + || lower.contains("google") + || lower.contains("slack") + || lower.contains("notion") + || lower.contains("gmail") + || lower.contains("fetch_type") + || lower.contains("timemax") + || lower.contains("timemin") +} + +fn extract_transport_detail(raw: &str) -> String { + raw.split_once(": ") + .map(|(_, tail)| tail.to_string()) + .unwrap_or_else(|| raw.to_string()) +} + +fn summarize_gateway(raw: &str) -> String { + if let Some(idx) = raw.find("Backend returned ") { + let rest = &raw[idx..]; + if let Some(colon) = rest.rfind(": ") { + return rest[colon + 2..].trim().to_string(); + } + return rest.trim().to_string(); + } + raw.trim().to_string() +} + +#[cfg(test)] +#[path = "error_mapping_tests.rs"] +mod tests; diff --git a/src/openhuman/composio/error_mapping_tests.rs b/src/openhuman/composio/error_mapping_tests.rs new file mode 100644 index 0000000000..4933c2d959 --- /dev/null +++ b/src/openhuman/composio/error_mapping_tests.rs @@ -0,0 +1,44 @@ +use super::{classify_composio_error, remap_transport_error, ComposioErrorClass}; + +#[test] +fn classifies_gmail_insufficient_scope() { + let msg = "HTTP 403: Request had insufficient authentication scopes."; + assert_eq!( + classify_composio_error("GMAIL_FETCH_EMAILS", msg), + ComposioErrorClass::InsufficientScope + ); +} + +#[test] +fn classifies_slack_rate_limit() { + let msg = "Slack API error: ratelimited"; + assert_eq!( + classify_composio_error("SLACK_FETCH_CONVERSATION_HISTORY", msg), + ComposioErrorClass::RateLimited + ); +} + +#[test] +fn embedded_provider_failure_in_502_body_is_not_gateway() { + let raw = "Backend returned 502 Bad Gateway for POST https://api.example.com/agent-integrations/composio/execute: \ + timeMax must be RFC 3339 timestamp"; + let mapped = remap_transport_error("GOOGLECALENDAR_EVENTS_LIST", raw); + assert!( + mapped.contains("[composio:error:"), + "expected classified prefix, got: {mapped}" + ); + assert!( + !mapped.contains("[composio:error:gateway]"), + "provider-shaped 502 body must not be labeled gateway: {mapped}" + ); +} + +#[test] +fn true_gateway_stays_gateway_class() { + let raw = "Backend returned 502 Bad Gateway for POST https://api.example.com/x: upstream down"; + let mapped = remap_transport_error("GMAIL_SEND_EMAIL", raw); + assert!( + mapped.contains("[composio:error:gateway]"), + "expected gateway class, got: {mapped}" + ); +} diff --git a/src/openhuman/composio/execute_dispatch.rs b/src/openhuman/composio/execute_dispatch.rs new file mode 100644 index 0000000000..8ba5865c32 --- /dev/null +++ b/src/openhuman/composio/execute_dispatch.rs @@ -0,0 +1,188 @@ +//! Shared Composio execute path: prepare args, retry policy, error mapping (#1797). + +use std::time::Duration; + +use super::auth_retry::{execute_with_auth_retry_inner, AUTH_RETRY_BACKOFF}; +use super::client::{direct_execute, ComposioClient, ComposioClientKind}; +use super::error_mapping::{format_provider_error, remap_transport_error}; +use super::execute_prepare::prepare_execute_arguments; +use super::types::ComposioExecuteResponse; + +const SLACK_HISTORY: &str = "SLACK_FETCH_CONVERSATION_HISTORY"; +const RATELIMIT_INITIAL_BACKOFF: Duration = Duration::from_secs(2); +const RATELIMIT_MAX_BACKOFF: Duration = Duration::from_secs(30); +const RATELIMIT_MAX_ATTEMPTS: u32 = 6; + +pub async fn execute_composio_action( + client: &ComposioClient, + tool: &str, + arguments: Option, +) -> Result { + let tool = tool.trim(); + if tool.is_empty() { + return Err("composio: tool slug must not be empty".to_string()); + } + + let prepared = match prepare_execute_arguments(tool, arguments) { + Ok(args) => args, + Err(msg) => { + tracing::debug!( + tool = %tool, + error = %msg, + "[composio][prepare] local validation rejected execute" + ); + return Err(format_provider_error(tool, &msg)); + } + }; + + tracing::debug!(tool = %tool, "[composio][dispatch] execute_composio_action"); + let resp = match execute_with_retries(client, tool, prepared).await { + Ok(resp) => resp, + Err(e) => { + tracing::debug!(tool = %tool, "[composio][dispatch] transport failure"); + return Err(remap_transport_error(tool, &e.to_string())); + } + }; + + if resp.successful { + return Ok(resp); + } + + let raw_err = resp + .error + .clone() + .unwrap_or_else(|| "provider reported failure".to_string()); + Ok(ComposioExecuteResponse { + error: Some(format_provider_error(tool, &raw_err)), + ..resp + }) +} + +async fn execute_with_retries( + client: &ComposioClient, + tool: &str, + args: serde_json::Value, +) -> anyhow::Result { + let mut delay = RATELIMIT_INITIAL_BACKOFF; + for attempt in 1..=RATELIMIT_MAX_ATTEMPTS { + let resp = execute_with_auth_retry_inner( + client, + tool, + Some(args.clone()), + if attempt == 1 { + AUTH_RETRY_BACKOFF + } else { + Duration::ZERO + }, + ) + .await?; + + if resp.successful { + return Ok(resp); + } + + let err_text = resp.error.as_deref().unwrap_or(""); + // Only Slack's conversations.history is allow-listed for transparent + // rate-limit retries today: it surfaces 429s on bursty agent reads and + // has stable retry semantics. Other tools surface 429 to the caller + // (formatted as `[composio:error:rate_limited]`) instead of stalling. + if tool == SLACK_HISTORY && is_rate_limited(err_text) && attempt < RATELIMIT_MAX_ATTEMPTS { + tracing::warn!( + tool = %tool, + attempt, + max_attempts = RATELIMIT_MAX_ATTEMPTS, + sleep_ms = delay.as_millis() as u64, + "[composio][dispatch] upstream rate limit; backing off (#1797)" + ); + tokio::time::sleep(delay).await; + delay = (delay * 2).min(RATELIMIT_MAX_BACKOFF); + continue; + } + + return Ok(resp); + } + unreachable!("loop returns on final attempt"); +} + +fn is_rate_limited(err: &str) -> bool { + let lower = err.to_ascii_lowercase(); + lower.contains("rate limit") + || lower.contains("rate_limit") + || lower.contains("ratelimited") + || lower.contains("too many requests") + || lower.contains("429") +} + +/// Mode-aware variant: routes through the backend (with auth-retry + +/// rate-limit backoff + error mapping) or the direct tenant client +/// (no auth-retry, but local validation + error mapping still apply). +/// +/// Added after #1710's mode-aware client split (`ComposioClientKind`) so +/// the per-action tool surface, dispatcher tool, and RPC `composio_execute` +/// op all share one entry point with consistent error semantics. +pub async fn execute_composio_action_kind( + kind: ComposioClientKind, + tool: &str, + arguments: Option, + entity_id: &str, +) -> Result { + let tool_trim = tool.trim(); + if tool_trim.is_empty() { + return Err("composio: tool slug must not be empty".to_string()); + } + + let prepared = match prepare_execute_arguments(tool_trim, arguments) { + Ok(args) => args, + Err(msg) => { + tracing::debug!( + tool = %tool_trim, + error = %msg, + "[composio][prepare] local validation rejected execute" + ); + return Err(format_provider_error(tool_trim, &msg)); + } + }; + + match kind { + ComposioClientKind::Backend(client) => { + tracing::debug!(tool = %tool_trim, "[composio][dispatch] backend variant"); + let resp = match execute_with_retries(&client, tool_trim, prepared).await { + Ok(resp) => resp, + Err(e) => { + tracing::debug!(tool = %tool_trim, "[composio][dispatch] transport failure"); + return Err(remap_transport_error(tool_trim, &e.to_string())); + } + }; + Ok(format_response(tool_trim, resp)) + } + ComposioClientKind::Direct(direct) => { + tracing::debug!(tool = %tool_trim, "[composio][dispatch] direct variant"); + // Direct path skips auth_retry — the user's stored Composio + // API key has no backend-side refresh surface and a 401 is a + // config issue that should surface immediately rather than + // retry-loop. Local validation + error mapping still apply. + match direct_execute(&direct, tool_trim, Some(prepared), entity_id).await { + Ok(resp) => Ok(format_response(tool_trim, resp)), + Err(e) => Err(remap_transport_error(tool_trim, &e.to_string())), + } + } + } +} + +fn format_response(tool: &str, resp: ComposioExecuteResponse) -> ComposioExecuteResponse { + if resp.successful { + return resp; + } + let raw_err = resp + .error + .clone() + .unwrap_or_else(|| "provider reported failure".to_string()); + ComposioExecuteResponse { + error: Some(format_provider_error(tool, &raw_err)), + ..resp + } +} + +#[cfg(test)] +#[path = "execute_dispatch_tests.rs"] +mod tests; diff --git a/src/openhuman/composio/execute_dispatch_tests.rs b/src/openhuman/composio/execute_dispatch_tests.rs new file mode 100644 index 0000000000..22d2fa79c9 --- /dev/null +++ b/src/openhuman/composio/execute_dispatch_tests.rs @@ -0,0 +1,56 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use axum::{routing::post, Json, Router}; +use serde_json::json; + +use super::execute_composio_action; +use crate::openhuman::composio::client::ComposioClient; +use crate::openhuman::integrations::IntegrationClient; + +async fn start_mock_backend(app: Router) -> String { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + // The spawned task is intentionally orphaned; it dies with the tokio + // runtime when the test finishes. The previous oneshot-based graceful + // shutdown was broken because the sender was dropped immediately, + // signalling shutdown before the test could exercise the server. + tokio::spawn(async move { + let _ = axum::serve(listener, app).await; + }); + format!("http://{addr}") +} + +fn build_client(base: &str) -> ComposioClient { + let inner = Arc::new(IntegrationClient::new( + base.to_string(), + "test-token".to_string(), + )); + ComposioClient::new(inner) +} + +#[tokio::test] +async fn local_validation_skips_network() { + let attempts = Arc::new(AtomicUsize::new(0)); + let app = Router::new().route( + "/agent-integrations/composio/execute", + post({ + let attempts = attempts.clone(); + move || async move { + attempts.fetch_add(1, Ordering::SeqCst); + Json(json!({"success": true, "data": {"successful": true, "data": {}, "costUsd": 0.0}})) + } + }), + ); + let base = start_mock_backend(app).await; + let client = build_client(&base); + let err = execute_composio_action( + &client, + "GMAIL_SEND_EMAIL", + Some(json!({ "subject": "hello" })), + ) + .await + .unwrap_err(); + assert!(err.contains("[composio:error:")); + assert_eq!(attempts.load(Ordering::SeqCst), 0); +} diff --git a/src/openhuman/composio/execute_prepare.rs b/src/openhuman/composio/execute_prepare.rs new file mode 100644 index 0000000000..f74218125f --- /dev/null +++ b/src/openhuman/composio/execute_prepare.rs @@ -0,0 +1,163 @@ +//! Normalize and validate Composio action arguments before dispatch (#1797). + +use serde_json::{json, Map, Value}; + +pub fn prepare_execute_arguments(tool: &str, arguments: Option) -> Result { + let tool = tool.trim(); + let mut args = match arguments { + Some(Value::Object(map)) => Value::Object(map), + Some(Value::Null) | None => Value::Object(Map::new()), + Some(other) => { + return Err(format!( + "composio: `{tool}` arguments must be a JSON object, got {}", + other + )); + } + }; + + if tool.starts_with("GOOGLECALENDAR_") { + normalize_calendar_time_bounds(&mut args)?; + } + if tool == "NOTION_FETCH_DATA" { + ensure_notion_fetch_type(&mut args)?; + } + if tool == "GMAIL_SEND_EMAIL" { + validate_gmail_send_email(&args)?; + } + if tool == "GMAIL_ADD_LABEL_TO_EMAIL" { + validate_gmail_add_label(&args)?; + } + + Ok(args) +} + +fn normalize_calendar_time_bounds(args: &mut Value) -> Result<(), String> { + let Some(obj) = args.as_object_mut() else { + return Ok(()); + }; + for key in ["timeMin", "timeMax", "time_min", "time_max"] { + if let Some(v) = obj.get(key).cloned() { + if let Some(normalized) = normalize_rfc3339_bound(&v) { + obj.insert(key.to_string(), Value::String(normalized)); + } else if v.is_string() { + return Err(format!( + "GOOGLECALENDAR time bound `{key}` must be an RFC 3339 timestamp \ + (e.g. 2026-05-14T00:00:00Z), not a bare date" + )); + } + } + } + Ok(()) +} + +fn normalize_rfc3339_bound(value: &Value) -> Option { + let s = value.as_str()?.trim(); + if s.is_empty() { + return None; + } + if s.contains('T') { + return Some(s.to_string()); + } + // A bare date like `2026-05-14` is promoted to RFC 3339 midnight UTC. + // Parse explicitly so impossible dates such as `2026-99-99` are rejected + // up front instead of being passed through to Google Calendar. + if chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d").is_ok() { + return Some(format!("{s}T00:00:00Z")); + } + None +} + +fn ensure_notion_fetch_type(args: &mut Value) -> Result<(), String> { + let Some(obj) = args.as_object_mut() else { + return Ok(()); + }; + let has_fetch_type = obj + .get("fetch_type") + .or_else(|| obj.get("fetchType")) + .and_then(|v| v.as_str()) + .map(str::trim) + .is_some_and(|s| !s.is_empty()); + if has_fetch_type { + return Ok(()); + } + let inferred = obj + .get("filter") + .and_then(|f| f.get("value").or_else(|| f.get("property"))) + .and_then(|v| v.as_str()) + .map(str::trim) + .filter(|s| !s.is_empty()) + .map(|v| match v { + "page" | "pages" => "pages", + "database" | "databases" => "databases", + other => other, + }) + .unwrap_or("pages"); + tracing::debug!( + fetch_type = %inferred, + "[composio][prepare] NOTION_FETCH_DATA: inferred fetch_type" + ); + obj.insert("fetch_type".to_string(), json!(inferred)); + Ok(()) +} + +fn validate_gmail_send_email(args: &Value) -> Result<(), String> { + let Some(obj) = args.as_object() else { + return Err("GMAIL_SEND_EMAIL: arguments must be an object".to_string()); + }; + let recipient = obj + .get("to") + .or_else(|| obj.get("recipient_email")) + .or_else(|| obj.get("recipientEmail")) + .and_then(|v| v.as_str()) + .map(str::trim) + .filter(|s| !s.is_empty()); + if recipient.is_some() { + return Ok(()); + } + Err( + "GMAIL_SEND_EMAIL: `to` (or `recipient_email`) is required — cannot send without a recipient" + .to_string(), + ) +} + +fn validate_gmail_add_label(args: &Value) -> Result<(), String> { + let Some(obj) = args.as_object() else { + return Err("GMAIL_ADD_LABEL_TO_EMAIL: arguments must be an object".to_string()); + }; + let message_id = obj + .get("message_id") + .or_else(|| obj.get("messageId")) + .and_then(|v| v.as_str()) + .map(str::trim) + .filter(|s| !s.is_empty()); + if message_id.is_none() { + return Err("GMAIL_ADD_LABEL_TO_EMAIL: `message_id` is required".to_string()); + } + let add = non_empty_string_array(obj.get("add_label_ids").or_else(|| obj.get("addLabelIds"))); + let remove = non_empty_string_array( + obj.get("remove_label_ids") + .or_else(|| obj.get("removeLabelIds")), + ); + if add || remove { + return Ok(()); + } + Err( + "GMAIL_ADD_LABEL_TO_EMAIL: provide at least one non-empty label in `add_label_ids` or \ + `remove_label_ids`" + .to_string(), + ) +} + +fn non_empty_string_array(value: Option<&Value>) -> bool { + match value { + Some(Value::Array(items)) => items + .iter() + .any(|v| v.as_str().map(str::trim).is_some_and(|s| !s.is_empty())), + Some(Value::String(s)) => !s.trim().is_empty(), + _ => false, + } +} + +#[cfg(test)] +#[path = "execute_prepare_tests.rs"] +mod tests; diff --git a/src/openhuman/composio/execute_prepare_tests.rs b/src/openhuman/composio/execute_prepare_tests.rs new file mode 100644 index 0000000000..574e26c974 --- /dev/null +++ b/src/openhuman/composio/execute_prepare_tests.rs @@ -0,0 +1,41 @@ +use serde_json::json; + +use super::prepare_execute_arguments; + +#[test] +fn calendar_bare_date_becomes_rfc3339() { + let args = json!({ + "timeMin": "2026-05-14", + "timeMax": "2026-05-15" + }); + let prepared = prepare_execute_arguments("GOOGLECALENDAR_EVENTS_LIST", Some(args)).unwrap(); + assert_eq!(prepared["timeMin"], "2026-05-14T00:00:00Z"); + assert_eq!(prepared["timeMax"], "2026-05-15T00:00:00Z"); +} + +#[test] +fn notion_fetch_data_infers_fetch_type_from_filter() { + let args = json!({ + "filter": { "value": "page", "property": "object" }, + "page_size": 25 + }); + let prepared = prepare_execute_arguments("NOTION_FETCH_DATA", Some(args)).unwrap(); + assert_eq!(prepared["fetch_type"], "pages"); +} + +#[test] +fn gmail_send_requires_recipient() { + let err = prepare_execute_arguments("GMAIL_SEND_EMAIL", Some(json!({ "subject": "hi" }))) + .unwrap_err(); + assert!(err.contains("recipient") || err.contains("`to`")); +} + +#[test] +fn gmail_add_label_requires_label_ids() { + let err = prepare_execute_arguments( + "GMAIL_ADD_LABEL_TO_EMAIL", + Some(json!({ "message_id": "m1", "add_label_ids": [], "remove_label_ids": [] })), + ) + .unwrap_err(); + assert!(err.contains("add_label_ids") || err.contains("remove_label_ids")); +} diff --git a/src/openhuman/composio/mod.rs b/src/openhuman/composio/mod.rs index e59ffd383a..925fbc30f5 100644 --- a/src/openhuman/composio/mod.rs +++ b/src/openhuman/composio/mod.rs @@ -39,6 +39,9 @@ pub mod action_tool; pub mod auth_retry; pub mod bus; pub mod client; +pub mod error_mapping; +pub mod execute_dispatch; +pub mod execute_prepare; pub mod googlecalendar_args; pub mod ops; pub mod periodic; diff --git a/src/openhuman/composio/ops.rs b/src/openhuman/composio/ops.rs index cf720ee891..56279288e3 100644 --- a/src/openhuman/composio/ops.rs +++ b/src/openhuman/composio/ops.rs @@ -22,8 +22,8 @@ type OpResult = std::result::Result; use std::sync::Arc; use super::client::{ - build_composio_client, create_composio_client, direct_authorize, direct_execute, - direct_list_connections, direct_list_tools, ComposioClient, ComposioClientKind, + build_composio_client, create_composio_client, direct_authorize, direct_list_connections, + direct_list_tools, ComposioClient, ComposioClientKind, }; use super::providers::{ get_provider, ProviderContext, ProviderUserProfile, SyncOutcome, SyncReason, @@ -571,16 +571,18 @@ pub async fn composio_execute( // body preference, and cost-USD log line all stay uniform (#1710). let kind = create_composio_client(config).map_err(|e| format!("[composio] execute: {e}"))?; let started = std::time::Instant::now(); - let result = match kind { - ComposioClientKind::Backend(client) => { - tracing::debug!(tool = %tool, "[composio] execute: backend variant"); - client.execute_tool(tool, arguments).await - } - ComposioClientKind::Direct(direct) => { - tracing::debug!(tool = %tool, "[composio-direct] execute: direct variant"); - direct_execute(&direct, tool, arguments, &config.composio.entity_id).await - } - }; + // Centralized prepare → retry → error-mapping pipeline (#1797), + // mode-aware over the backend/direct split (#1710). The dispatcher + // returns pre-formatted `[composio:error:] …` strings so the + // frontend formatter at `app/src/lib/composio/formatters.ts` can + // parse the class regardless of which mode produced the failure. + let result = super::execute_dispatch::execute_composio_action_kind( + kind, + tool, + arguments, + &config.composio.entity_id, + ) + .await; let elapsed_ms = started.elapsed().as_millis() as u64; match result { @@ -616,7 +618,21 @@ pub async fn composio_execute( }, ); report_composio_op_error("execute", &e); - Err(format!("[composio] execute failed: {e:#}")) + // Preserve already-classified errors from the dispatcher + // (`[composio:error:] …`) so the frontend formatter at + // `app/src/lib/composio/formatters.ts` can still parse the class. + let is_classified = e.starts_with("[composio:error:"); + tracing::debug!( + tool = %tool, + elapsed_ms, + classified = is_classified, + "[composio] rpc execute error mapped" + ); + if is_classified { + Err(e) + } else { + Err(format!("[composio] execute failed: {e}")) + } } } } diff --git a/src/openhuman/composio/ops_tests.rs b/src/openhuman/composio/ops_tests.rs index 64feeefd53..0b968fcc2e 100644 --- a/src/openhuman/composio/ops_tests.rs +++ b/src/openhuman/composio/ops_tests.rs @@ -405,7 +405,15 @@ async fn composio_execute_via_mock_propagates_backend_error() { let err = composio_execute(&config, "ANY_TOOL", None) .await .unwrap_err(); - assert!(err.contains("execute failed")); + // The dispatcher (`execute_composio_action`) classifies transport + // failures and prefixes them with `[composio:error:] …`; ops.rs + // preserves that prefix so the frontend formatter can parse the class. + // For an unrecognised tool slug and a 502-shaped envelope the only + // signal we get is the backend error text, so assert on its contents. + assert!( + err.starts_with("[composio:error:") && err.contains("rate limited"), + "got: {err}" + ); } #[tokio::test] diff --git a/src/openhuman/composio/tools.rs b/src/openhuman/composio/tools.rs index e398e3e2dd..5c98b4a498 100644 --- a/src/openhuman/composio/tools.rs +++ b/src/openhuman/composio/tools.rs @@ -34,9 +34,7 @@ use crate::openhuman::tools::traits::{ PermissionLevel, Tool, ToolCallOptions, ToolCategory, ToolResult, }; -use super::client::{ - create_composio_client, direct_execute, direct_list_connections, ComposioClientKind, -}; +use super::client::{create_composio_client, direct_list_connections, ComposioClientKind}; use super::providers::{ catalog_for_toolkit, classify_unknown, find_curated, get_provider, load_user_scope_or_default, toolkit_from_slug, ToolScope, UserScopePref, @@ -952,24 +950,15 @@ impl Tool for ComposioExecuteTool { }; let started = std::time::Instant::now(); - let res = match kind { - ComposioClientKind::Backend(client) => { - tracing::debug!(tool = %tool, "[composio] tool execute.execute: backend variant"); - // Backend path retains the upstream `auth_retry` wrapper - // so a 401 from a stale tinyhumans-tenant token is - // refreshed-and-replayed exactly once before surfacing. - super::auth_retry::execute_with_auth_retry(&client, &tool, arguments).await - } - ComposioClientKind::Direct(direct) => { - tracing::debug!(tool = %tool, "[composio] tool execute.execute: direct variant"); - // Direct path skips `auth_retry`: the user's stored - // Composio API key doesn't have a backend-side refresh - // surface and a 401 here is a config issue (rotated key, - // wrong key, deleted) that should surface to the user - // rather than retry-loop. - direct_execute(&direct, &tool, arguments, &live_config.composio.entity_id).await - } - }; + // Centralized prepare → retry → error-mapping pipeline (#1797), + // mode-aware over the backend/direct split (#1710). + let res = super::execute_dispatch::execute_composio_action_kind( + kind, + &tool, + arguments, + &live_config.composio.entity_id, + ) + .await; let elapsed_ms = started.elapsed().as_millis() as u64; match res { Ok(resp) => { @@ -1012,7 +1001,7 @@ impl Tool for ComposioExecuteTool { elapsed_ms, }, ); - Ok(ToolResult::error(format!("composio_execute failed: {e}"))) + Ok(ToolResult::error(e)) } } } diff --git a/src/openhuman/socket/event_handlers.rs b/src/openhuman/socket/event_handlers.rs index abd6566d42..74d91da537 100644 --- a/src/openhuman/socket/event_handlers.rs +++ b/src/openhuman/socket/event_handlers.rs @@ -37,6 +37,23 @@ pub(super) fn handle_sio_event( event_name, payload.len() ); + // CodeRabbit #3250222027: even at debug level, raw bodies can leak + // PII / secrets / tokens. Log structural metadata (top-level shape + + // byte length) but never the raw text. + let payload_shape = match &data { + serde_json::Value::Object(map) => format!("object_keys={}", map.len()), + serde_json::Value::Array(arr) => format!("array_len={}", arr.len()), + serde_json::Value::String(_) => "string".to_string(), + serde_json::Value::Number(_) => "number".to_string(), + serde_json::Value::Bool(_) => "bool".to_string(), + serde_json::Value::Null => "null".to_string(), + }; + log::debug!( + "[socket] event payload: name={} data_bytes={} shape={} preview_omitted=true", + event_name, + payload.len(), + payload_shape + ); log::debug!("[socket] event dispatch: name={}", event_name); match event_name {