Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions src/api/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,36 @@ use std::time::Duration;

use super::jwt::bearer_authorization_value;

/// Typed errors surfaced by `authed_json` for expected backend states that
/// callers should recover from in-flow rather than funnel into Sentry.
#[derive(Debug, thiserror::Error)]
pub enum BackendApiError {
/// Edit / delete of a channel message returned 404. Happens when the
/// user deletes the message on the provider side (Telegram, Discord,
/// Slack, …) but our local `StreamingState` still has the id, or when
/// the backend GC'd the relay row before we got around to editing it.
/// Callers should clear stale state and skip the retry. Targets
/// `OPENHUMAN-TAURI-2Y` (~454 events on `/channels/telegram/messages/<id>`).
#[error("message not found on {provider}: {message_id}")]
MessageNotFound {
/// Channel provider segment (e.g. `"telegram"`, `"discord"`).
provider: String,
/// Provider-specific message id from the URL.
message_id: String,
},
}

/// Extract `(provider, message_id)` from a backend channel path of the
/// shape `/channels/<provider>/messages/<id>`. Returns `None` for paths
/// with a different segment count or non-`channels` first segment.
fn parse_message_path(path: &str) -> Option<(&str, &str)> {
let segments: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
if segments.len() == 4 && segments[0] == "channels" && segments[2] == "messages" {
return Some((segments[1], segments[3]));
}
None
}

const CLIENT_VERSION_HEADER_MAX_LEN: usize = 64;

fn sanitize_client_version(raw: &str) -> Option<String> {
Expand Down Expand Up @@ -471,6 +501,32 @@ impl BackendOAuthClient {
if !status.is_success() {
let status_code = status.as_u16();
let status_str = status_code.to_string();

// 404 on `/channels/<provider>/messages/<id>` is an expected
// state (user deleted the message provider-side, or backend
// GC'd the relay row) — not a code bug. Surface a typed
// `BackendApiError::MessageNotFound` so callers (`bus.rs`
// streaming/thinking/delete/final paths) can clear stale
// ids and skip retry, without funneling the 404 into
// `report_error`. Targets `OPENHUMAN-TAURI-2Y` (~454 events).
if status_code == 404 {
if let Some((provider, message_id)) = parse_message_path(url.path()) {
tracing::info!(
domain = "backend_api",
operation = "authed_json",
provider = provider,
message_id = message_id,
"[backend_api] message-not-found 404 on {} {} — surfacing typed error",
method.as_str(),
url.path(),
);
return Err(anyhow::Error::new(BackendApiError::MessageNotFound {
provider: provider.to_string(),
message_id: message_id.to_string(),
}));
}
}

// These are transient infrastructure errors (proxy/CDN/backend
// temporarily unavailable). They are not code bugs and callers already
// implement retry/disable logic, so skip Sentry to avoid noise.
Expand Down
88 changes: 87 additions & 1 deletion src/api/rest_tests.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use super::{key_bytes_from_string, sanitize_client_version, BackendOAuthClient};
use super::{key_bytes_from_string, sanitize_client_version, BackendApiError, BackendOAuthClient};
use axum::extract::State;
use axum::http::HeaderMap;
use axum::routing::{get, post};
use axum::{Json, Router};
use base64::engine::general_purpose::{STANDARD, URL_SAFE_NO_PAD};
use base64::Engine;
use reqwest::Method;
use serde_json::{json, Value};
use std::sync::{Arc, Mutex};
use tokio::net::TcpListener;
Expand Down Expand Up @@ -268,3 +269,88 @@ async fn backend_raw_client_inherits_x_core_version_default_header() {
sanitize_client_version(env!("CARGO_PKG_VERSION")).unwrap()
);
}

#[tokio::test]
async fn authed_json_surfaces_message_not_found_on_404() {
let app = Router::new()
.route(
"/channels/telegram/messages/1103",
post(|| async { (axum::http::StatusCode::NOT_FOUND, "Not Found") }),
)
.route(
"/channels/discord/messages/abc",
post(|| async { (axum::http::StatusCode::NOT_FOUND, "Not Found") }),
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});

let base_url = format!("http://{addr}");
let client = BackendOAuthClient::new(&base_url).unwrap();

// Telegram path — matches OPENHUMAN-TAURI-2Y shape.
let err = client
.authed_json(
"mock-jwt",
Method::POST,
"/channels/telegram/messages/1103",
None,
)
.await
.unwrap_err();
let typed = err.downcast_ref::<BackendApiError>().unwrap();
let BackendApiError::MessageNotFound {
provider,
message_id,
} = typed;
assert_eq!(provider, "telegram");
assert_eq!(message_id, "1103");

// Discord path — proves the helper is provider-agnostic.
let err = client
.authed_json(
"mock-jwt",
Method::POST,
"/channels/discord/messages/abc",
None,
)
.await
.unwrap_err();
let typed = err.downcast_ref::<BackendApiError>().unwrap();
let BackendApiError::MessageNotFound {
provider,
message_id,
} = typed;
assert_eq!(provider, "discord");
assert_eq!(message_id, "abc");
}

#[tokio::test]
async fn authed_json_404_outside_messages_path_still_reports() {
// 404 on a non-`/channels/<provider>/messages/<id>` path should NOT be
// demoted to MessageNotFound — it's a real backend bug or routing
// mistake and must keep its Sentry signal.
let app = Router::new().route(
"/auth/profile",
get(|| async { (axum::http::StatusCode::NOT_FOUND, "Not Found") }),
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});

let base_url = format!("http://{addr}");
let client = BackendOAuthClient::new(&base_url).unwrap();

let err = client
.authed_json("mock-jwt", Method::GET, "/auth/profile", None)
.await
.unwrap_err();
assert!(
err.downcast_ref::<BackendApiError>().is_none(),
"non-channel-message 404 must not be classified as MessageNotFound"
);
}
91 changes: 68 additions & 23 deletions src/openhuman/channels/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,18 @@ async fn flush_streaming_edit(channel: &str, state: &mut StreamingState) {
}
Err(err) => {
state.edit_failures += 1;
if let Some(crate::api::rest::BackendApiError::MessageNotFound { .. }) =
err.downcast_ref::<crate::api::rest::BackendApiError>()
{
tracing::info!(
"[channel-inbound][stream] edit channel='{}' msg_id={} — message gone provider-side (404), clearing stale id and disabling further edits",
channel,
message_id,
);
state.message_id = None;
state.edit_disabled = true;
return;
}
tracing::warn!(
"[channel-inbound][stream] edit failed channel='{}' msg_id={} err={} (failures={}/{})",
channel,
Expand Down Expand Up @@ -545,16 +557,28 @@ async fn flush_thinking_message(channel: &str, state: &mut StreamingState) {
return;
};

if let Some(ref msg_id) = state.thinking_message_id {
if let Some(msg_id) = state.thinking_message_id.clone() {
// Edit existing thinking message with updated content.
let body = json!({ "text": text });
if let Err(err) = client.send_channel_edit(channel, msg_id, &jwt, body).await {
tracing::debug!(
"[channel-inbound][thinking] edit failed channel='{}' msg_id={} err={}",
channel,
msg_id,
err,
);
if let Err(err) = client.send_channel_edit(channel, &msg_id, &jwt, body).await {
if let Some(crate::api::rest::BackendApiError::MessageNotFound { .. }) =
err.downcast_ref::<crate::api::rest::BackendApiError>()
{
tracing::info!(
"[channel-inbound][thinking] edit channel='{}' msg_id={} — thinking msg gone provider-side (404), clearing id and disabling further thinking edits",
channel,
msg_id,
);
state.thinking_message_id = None;
state.thinking_edit_disabled = true;
} else {
tracing::debug!(
"[channel-inbound][thinking] edit failed channel='{}' msg_id={} err={}",
channel,
msg_id,
err,
);
}
}
} else {
// Send initial thinking message.
Expand Down Expand Up @@ -694,12 +718,22 @@ async fn delete_channel_message(channel: &str, message_id: &str) {
);
}
Err(err) => {
tracing::warn!(
"[channel-inbound] failed to delete ephemeral msg channel='{}' msg_id={} err={}",
channel,
message_id,
err,
);
if let Some(crate::api::rest::BackendApiError::MessageNotFound { .. }) =
err.downcast_ref::<crate::api::rest::BackendApiError>()
{
tracing::info!(
"[channel-inbound] delete channel='{}' msg_id={} — message already gone provider-side (404), nothing to clean up",
channel,
message_id,
);
} else {
tracing::warn!(
"[channel-inbound] failed to delete ephemeral msg channel='{}' msg_id={} err={}",
channel,
message_id,
err,
);
}
}
}
}
Expand Down Expand Up @@ -742,15 +776,26 @@ async fn finalize_channel_reply(channel: &str, state: &mut StreamingState, final
);
}
Err(err) => {
tracing::warn!(
"[channel-inbound] final edit failed channel='{}' msg_id={} err={} — deleting orphan draft and sending fresh atomic reply so user still sees the canonical response",
channel,
message_id,
err,
);
let orphan = message_id.clone();
delete_channel_message(channel, &orphan).await;
send_channel_reply(channel, final_text).await;
if let Some(crate::api::rest::BackendApiError::MessageNotFound { .. }) =
err.downcast_ref::<crate::api::rest::BackendApiError>()
{
tracing::info!(
"[channel-inbound] final edit channel='{}' msg_id={} — draft already gone provider-side (404), sending fresh atomic reply",
channel,
message_id,
);
send_channel_reply(channel, final_text).await;
} else {
tracing::warn!(
"[channel-inbound] final edit failed channel='{}' msg_id={} err={} — deleting orphan draft and sending fresh atomic reply so user still sees the canonical response",
channel,
message_id,
err,
);
let orphan = message_id.clone();
delete_channel_message(channel, &orphan).await;
send_channel_reply(channel, final_text).await;
}
}
}
} else {
Expand Down
Loading