Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 15 additions & 65 deletions src/openhuman/composio/auth_retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ use std::time::Duration;
use super::client::ComposioClient;
use super::types::ComposioExecuteResponse;

/// Literal error strings Composio's gateway emits during the post-OAuth
/// readiness gap. Matching is `error.contains(needle)` so trailing
/// punctuation and capitalisation drift on the gateway side does not
/// silently disable the retry.
pub(crate) const RETRYABLE_AUTH_ERRORS: &[&str] = &["Connection error, try to authenticate"];

/// Backoff before the single retry. 8s sits in the middle of the 5-10s
/// recommendation in issue #1688 — long enough for Composio's action
/// gateway to sync the token, short enough that a genuine auth failure
Expand Down Expand Up @@ -69,72 +63,28 @@ pub(crate) async fn execute_with_auth_retry_inner(
args: Option<serde_json::Value>,
backoff: Duration,
) -> anyhow::Result<ComposioExecuteResponse> {
tracing::debug!(
target: "composio",
slug = %slug,
has_args = args.is_some(),
"[composio][auth_retry] execute start"
);
let first = client.execute_tool_once(slug, args.clone()).await?;
if first.successful {
let tool = slug.trim();
if tool.is_empty() {
tracing::debug!(
target: "composio",
slug = %slug,
"[composio][auth_retry] first attempt successful; no retry"
raw_slug_len = slug.len(),
"[composio][auth_retry] rejecting empty tool slug"
);
return Ok(first);
anyhow::bail!("composio.execute_tool: tool slug must not be empty");
}
let err_text = first.error.as_deref().unwrap_or("");
let matched = match_retryable_auth_error(err_text);
let Some(matched) = matched else {
// Surface the response unchanged. We deliberately do NOT log
// `err_text` here — upstream provider messages can embed
// identifiers (emails, file IDs, channel names) and a `warn`
// line at every non-retryable failure would broadcast them.
tracing::debug!(
target: "composio",
slug = %slug,
has_error = !err_text.is_empty(),
"[composio][auth_retry] non-retryable payload; returning first response"
);
return Ok(first);
};
tracing::warn!(
target: "composio",
slug = %slug,
retry_reason = matched,
sleep_ms = backoff.as_millis() as u64,
"[composio] post-OAuth auth error on first action call; sleeping and retrying once (#1688)"
);
tokio::time::sleep(backoff).await;
let second = client.execute_tool_once(slug, args).await?;
let arguments = args.unwrap_or(serde_json::Value::Object(Default::default()));
let has_args = arguments.as_object().is_some_and(|a| !a.is_empty());
let body = serde_json::json!({ "tool": tool, "arguments": arguments });

tracing::debug!(
target: "composio",
slug = %slug,
successful = second.successful,
"[composio][auth_retry] retry attempt completed"
slug = %tool,
has_args,
"[composio][auth_retry] execute start"
);
Ok(second)
}

/// Returns the matched needle (a static label safe to log) when the
/// provider error text matches one of [`RETRYABLE_AUTH_ERRORS`]. Match
/// is case-insensitive so capitalisation drift on Composio's side does
/// not silently disable the retry.
fn match_retryable_auth_error(err: &str) -> Option<&'static str> {
if err.is_empty() {
return None;
}
let err_lc = err.to_ascii_lowercase();
RETRYABLE_AUTH_ERRORS
.iter()
.copied()
.find(|needle| err_lc.contains(&needle.to_ascii_lowercase()))
}

#[cfg(test)]
fn is_retryable_auth_error(err: &str) -> bool {
match_retryable_auth_error(err).is_some()
client
Comment thread
senamakel marked this conversation as resolved.
.execute_tool_with_post_oauth_retry(tool, &body, backoff)
.await
}

#[cfg(test)]
Expand Down
26 changes: 0 additions & 26 deletions src/openhuman/composio/auth_retry_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,29 +255,3 @@ async fn retries_once_only_even_when_second_call_still_errors() {
expected 2 (single-layer) or 4 (outer auth_retry.rs #1708 × inner execute_tool_with_post_oauth_retry #1707)."
);
}

#[test]
fn is_retryable_auth_error_matches_known_string() {
assert!(super::is_retryable_auth_error(
"Connection error, try to authenticate"
));
// Tolerates wrapping text — Composio sometimes wraps the message
// in a longer envelope.
assert!(super::is_retryable_auth_error(
"Action failed: Connection error, try to authenticate (gateway code 401)"
));
// Tolerates capitalisation drift on the gateway side.
assert!(super::is_retryable_auth_error(
"CONNECTION ERROR, TRY TO AUTHENTICATE"
));
assert!(super::is_retryable_auth_error(
"connection error, try to authenticate"
));
}

#[test]
fn is_retryable_auth_error_rejects_unrelated_messages() {
assert!(!super::is_retryable_auth_error("invalid_grant"));
assert!(!super::is_retryable_auth_error("ratelimited"));
assert!(!super::is_retryable_auth_error(""));
}
8 changes: 6 additions & 2 deletions src/openhuman/composio/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ use super::types::{
};

const POST_OAUTH_ACTION_RETRY_DELAY: Duration = Duration::from_secs(10);
/// Literal error fragments Composio's gateway emits during the post-OAuth
/// readiness gap. Matching is case-insensitive and substring-based so
/// trailing punctuation or wrapper text from the gateway does not silently
/// disable the retry.
const POST_OAUTH_AUTH_ERROR_STRINGS: &[&str] = &["connection error, try to authenticate"];

/// High-level client for all backend-proxied Composio operations.
Expand Down Expand Up @@ -200,7 +204,7 @@ impl ComposioClient {
.await
}

async fn execute_tool_with_post_oauth_retry(
pub(super) async fn execute_tool_with_post_oauth_retry(
&self,
tool: &str,
body: &serde_json::Value,
Expand Down Expand Up @@ -511,7 +515,7 @@ fn is_post_oauth_auth_readiness_error(resp: &ComposioExecuteResponse) -> bool {
let normalized = error.trim().to_ascii_lowercase();
POST_OAUTH_AUTH_ERROR_STRINGS
.iter()
.any(|needle| normalized == *needle)
.any(|needle| normalized.contains(needle))
}

/// Backend-mode [`ComposioClient`] constructor. **Internal to the
Expand Down
47 changes: 47 additions & 0 deletions src/openhuman/composio/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,53 @@ async fn execute_tool_retries_once_on_post_oauth_auth_readiness_error() {
assert_eq!(attempts.load(Ordering::SeqCst), 2);
}

#[test]
fn post_oauth_auth_readiness_error_matches_known_gateway_variants() {
for err in [
"Connection error, try to authenticate",
"connection error, try to authenticate",
"CONNECTION ERROR, TRY TO AUTHENTICATE",
"Action failed: Connection error, try to authenticate (gateway code 401)",
] {
assert!(
is_post_oauth_auth_readiness_error(&ComposioExecuteResponse {
data: json!({}),
successful: false,
error: Some(err.to_string()),
cost_usd: 0.0,
markdown_formatted: None,
}),
"should classify retryable Composio auth-readiness error: {err}"
);
}
}

#[test]
fn post_oauth_auth_readiness_error_rejects_unrelated_or_successful_payloads() {
for err in ["invalid_grant", "ratelimited", ""] {
assert!(
!is_post_oauth_auth_readiness_error(&ComposioExecuteResponse {
data: json!({}),
successful: false,
error: Some(err.to_string()),
cost_usd: 0.0,
markdown_formatted: None,
}),
"should not classify unrelated error as retryable: {err}"
);
}

assert!(!is_post_oauth_auth_readiness_error(
&ComposioExecuteResponse {
data: json!({}),
successful: true,
error: Some("Connection error, try to authenticate".to_string()),
cost_usd: 0.0,
markdown_formatted: None,
}
));
}

#[tokio::test]
async fn execute_tool_does_not_retry_other_auth_errors() {
let attempts = Arc::new(AtomicUsize::new(0));
Expand Down
Loading